In [5]:
# Import standard dependencies
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
import re

os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

import tensorflow as tf
from tensorflow.keras.layers import Layer, Conv2D, Lambda, Dense, MaxPooling2D, Input, Flatten, Dropout, BatchNormalization, LeakyReLU, Activation, Add
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.optimizers import Adam

In [6]:
def extract_first_number(filename):
    # this function extracts the first sequence of digits from the image name. I use it to identify the same people later.
    match = re.search(r'\d+', filename)

    if match:
        return int(match.group())
    else:
        return None

def resize_and_pad_image(img, target_size=(64, 64)):
    # this function resizes the image to the target size and pads it with zeros to keep the aspect ratio
    height, width, _ = img.shape

    target_height, target_width = target_size
    aspect_ratio = width / height

    if width == height:
        return cv2.resize(img, target_size)

    if aspect_ratio > 1:
        new_width = target_width
        new_height = int(target_width / aspect_ratio)
    else:
        new_width = int(target_height * aspect_ratio)
        new_height = target_height

    resized_img = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_AREA)

    top_pad = (target_height - new_height) // 2
    bottom_pad = target_height - new_height - top_pad
    left_pad = (target_width - new_width) // 2
    right_pad = target_width - new_width - left_pad

    padded_img = cv2.copyMakeBorder(resized_img, top_pad, bottom_pad, left_pad, right_pad, cv2.BORDER_CONSTANT, value=[0, 0, 0])

    return padded_img

## Build Test Partition

In [7]:
def load_data(path = '../Market-1501-v15.09.15/bounding_box_train/'):
    # this function loads the Images from the folder and save there labels
    files = os.listdir(path)
    files.sort()
    data = []
    counter = 0
    count_per_person = {}
    label = []
    for file in files:
            person_id = file.split('_')[0]

            if person_id not in count_per_person:
                count_per_person[person_id] = 1
            elif count_per_person[person_id] < 30:
                count_per_person[person_id] += 1
            else:
                continue  
            img = cv2.imread(path + file)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = img/255.0

            img_resized = resize_and_pad_image(img, target_size=(64, 64))

            data.append(img_resized)
            label.append(extract_first_number(file))

            counter += 1
            if counter%1000 == 0:
              print("Verarbeitung von 1000 Bildern abgeschlossen.")
            if counter == 3000:
                break
    return data, label

In [8]:
images, labels = load_data()

Verarbeitung von 1000 Bildern abgeschlossen.
Verarbeitung von 1000 Bildern abgeschlossen.
Verarbeitung von 1000 Bildern abgeschlossen.


In [9]:
def create_random_pairs(images, labels, num_positive_pairs=1000, num_negative_pairs=1000):
    pairs = []
    labels = np.array(labels)
    labels_a = []

    unique_labels = np.unique(labels)

    for _ in range(num_positive_pairs):
        person_label = np.random.choice(unique_labels)

        pair_indices = np.random.choice(np.where(labels == person_label)[0], 2, replace=False)

        pairs.append([images[pair_indices[0]], images[pair_indices[1]]])
        labels = np.append(labels, 1)

    for _ in range(num_negative_pairs):
        person1, person2 = np.random.choice(unique_labels, 2, replace=False)

        indices1 = np.random.choice(np.where(labels == person1)[0])
        indices2 = np.random.choice(np.where(labels == person2)[0])
        
        pairs.append([images[indices1], images[indices2]])
        labels = np.append(labels, 0)

    return np.array(pairs), labels

In [10]:
pairs, labels = create_random_pairs(images, labels)

In [11]:
pair_labels = labels[-2000:]

# Create the custom Siamese Network

In [13]:
def create_siamese_network(input_shape):
    
    input_a = Input(shape=input_shape)
    input_b = Input(shape=input_shape)

    model = Sequential()
    model.add(Conv2D(64, (3, 3), input_shape=input_shape))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.1))
    model.add(MaxPooling2D((2, 2), padding='same'))
    model.add(Dropout(0.1))

    model.add(Conv2D(128, (3,3)))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.1))
    model.add(MaxPooling2D((2, 2), padding='same'))
    model.add(Dropout(0.1))

    model.add(Conv2D(256, (3,3)))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.1))
    model.add(MaxPooling2D((2, 2), padding='same'))
    model.add(Dropout(0.1))

    model.add(Conv2D(256, (3, 3)))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.1))
    model.add(MaxPooling2D((2, 2), padding='same'))
    model.add(Dropout(0.1))

    model.add(Flatten())
    model.add(Dense(1024))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.1))

    encoded_a = model(input_a)
    encoded_b = model(input_b)

    l1_distance = Lambda(lambda tensors: tf.abs(tensors[0] - tensors[1]))([encoded_a, encoded_b])

    prediction = Dense(1, activation='sigmoid')(l1_distance)

    siamese_model = Model(inputs=[input_a, input_b], outputs=prediction)

    return siamese_model

In [14]:
input_shape = (64, 64, 3)
siamese_model = create_siamese_network(input_shape)
siamese_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 64, 64, 3)]          0         []                            
                                                                                                  
 input_2 (InputLayer)        [(None, 64, 64, 3)]          0         []                            
                                                                                                  
 sequential (Sequential)     (None, 1024)                 3328128   ['input_1[0][0]',             
                                                                     'input_2[0][0]']             
                                                                                                  
 lambda (Lambda)             (None, 1024)                 0         ['sequential[0][0]',      

In [16]:
from tensorflow.keras.models import load_model

# Load the Siamese Network

In [17]:
siamese_model.load_weights('./siamese_model_weights_2.h5')

In [18]:
from tensorflow.keras.optimizers import Adam

In [19]:
siamese_model.compile(loss='binary_crossentropy',
                      optimizer=Adam(learning_rate=0.001),
                      metrics=['accuracy'])

In [20]:
validation_loss, validation_accuracy = siamese_model.evaluate([pairs[:, 0], pairs[:, 1]], pair_labels)
print(f'Validation Loss: {validation_loss}, Validation Accuracy: {validation_accuracy}')


Validation Loss: 0.4479917883872986, Validation Accuracy: 0.8939999938011169


# Convert the Siamese Network to TF-Lite
 

In [56]:
converter = tf.lite.TFLiteConverter.from_keras_model(siamese_model)
tflite_model = converter.convert()

with open('siamese_model.tflite', 'wb') as f:
    f.write(tflite_model)

INFO:tensorflow:Assets written to: C:\Users\Emil\AppData\Local\Temp\tmp7u1cm2e_\assets


INFO:tensorflow:Assets written to: C:\Users\Emil\AppData\Local\Temp\tmp7u1cm2e_\assets


In [59]:
import tensorflow as tf

tflite_model_path = './Model/siamese_model.tflite'

interpreter = tf.lite.Interpreter(model_path=tflite_model_path)
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

print("Eingabe-Tensoren:")
print(input_details)
print("\nAusgabe-Tensoren:")
print(output_details)

input_shape = (1, 64, 64, 3)
input_a_data = np.random.rand(*input_shape).astype(np.float32)
input_b_data = np.random.rand(*input_shape).astype(np.float32)

interpreter.set_tensor(input_details[0]['index'], input_a_data)
interpreter.set_tensor(input_details[1]['index'], input_b_data)

interpreter.invoke()

output = interpreter.get_tensor(output_details[0]['index'])

Eingabe-Tensoren:
[{'name': 'serving_default_input_1:0', 'index': 0, 'shape': array([ 1, 64, 64,  3]), 'shape_signature': array([-1, 64, 64,  3]), 'dtype': <class 'numpy.float32'>, 'quantization': (0.0, 0), 'quantization_parameters': {'scales': array([], dtype=float32), 'zero_points': array([], dtype=int32), 'quantized_dimension': 0}, 'sparsity_parameters': {}}, {'name': 'serving_default_input_2:0', 'index': 1, 'shape': array([ 1, 64, 64,  3]), 'shape_signature': array([-1, 64, 64,  3]), 'dtype': <class 'numpy.float32'>, 'quantization': (0.0, 0), 'quantization_parameters': {'scales': array([], dtype=float32), 'zero_points': array([], dtype=int32), 'quantized_dimension': 0}, 'sparsity_parameters': {}}]

Ausgabe-Tensoren:
[{'name': 'StatefulPartitionedCall:0', 'index': 48, 'shape': array([1, 1]), 'shape_signature': array([-1,  1]), 'dtype': <class 'numpy.float32'>, 'quantization': (0.0, 0), 'quantization_parameters': {'scales': array([], dtype=float32), 'zero_points': array([], dtype=int

# and Test it again

In [75]:
predictions = []

for pair in pairs.astype(np.float32):
    input_a_data = np.expand_dims(pair[0], axis=0)  
    input_b_data = np.expand_dims(pair[1], axis=0)


    interpreter.set_tensor(input_details[0]['index'], input_a_data)
    interpreter.set_tensor(input_details[1]['index'], input_b_data)

    interpreter.invoke()

    output = interpreter.get_tensor(output_details[0]['index'])

    predictions.append(output)

In [73]:
threshold = 0.5

binary_predictions = [1 if pred >= threshold else 0 for pred in predictions]

correct_predictions = np.equal(binary_predictions, pair_labels)

accuracy = np.mean(correct_predictions)
print("accuracy:", accuracy)

Genauigkeit: 0.894
