In [None]:
#Imports
from keras.layers import Conv2D, Dense, Layer, Flatten, MaxPooling2D
from keras.models import Sequential
from keras.activations import sigmoid
import numpy as np
from PIL import Image
import tensorflow as tf
import RIG
import random
import os
from keras.constraints import unitnorm
import cv2

In [None]:
#Image Preprocessing Functions
img_dim = 64
def trim(img_array, img_w=img_dim, img_h=img_dim):
    return img_array[img_array.shape[0]//2 - img_w//2:img_array.shape[0]//2 + img_w//2, img_array.shape[1]//2 - img_h//2: img_array.shape[1]//2 + img_h//2]
def couple(img_array1, img_array2):
    return np.concatenate([img_array1, img_array2], axis=0)
def add_margin(pil_img, top, right, bottom, left, color):
    width, height = pil_img.size
    new_width = width + right + left
    new_height = height + top + bottom
    result = Image.new(pil_img.mode, (new_width, new_height), color)
    if new_height%2 == 1:
        result = Image.new(pil_img.mode, (new_width, new_height+1), color)
    if new_width%2 == 1:
        result = Image.new(pil_img.mode, (new_width+1, new_height), color)
    result.paste(pil_img, (left, top))
    return result
def face(img):
    img = np.array(img)
    #Change this to where your csv installation is located
    cv2_path = 'ml/lib/python3.10/site-packages/cv2/'
    face_cascade = cv2.CascadeClassifier(os.path.join(cv2_path, 'data/haarcascade_frontalface_default.xml'))
    faces = face_cascade.detectMultiScale(img, 1.1, 4)
    for (x, y, w, h) in faces:
        if y < img.shape[0] and x < img.shape[1]:
            if y+h < img.shape[0]:
                if x+w < img.shape[1]:
                    img = img[y:y + h, x:x + w]
                else:
                    img = img[y:y+h, x:]
            else:
                if x+w < img.shape[1]:
                    img = img[y:, x:x + w]
                else:
                    img = img[y:, x:]
    return Image.fromarray(img)

In [None]:
#Custom Layers

class Decouple(Layer):
    def __init__(self, size=img_dim):
        super(Decouple, self).__init__()
        self.size = size
    def call(self, inputs):
        return tf.concat([inputs[:, :self.size], inputs[:, inputs.shape[1]-self.size:]], 1)

In [None]:
#Populate Directory with Similars

with open("celebrities.txt") as f:
    celebs = f.readlines()
    f.close()

for celeb in celebs:
    RIG.store(query=celeb, dir="similars", quantity=2, store_keys=True, csv_path="references.csv", value=0)

In [None]:
#Populate Directory with Dissimilars(random)

#Returns the lengths of each file for the name generator
length_dict = {}
for file in os.listdir("curate"):
    path = os.path.join("curate", file)
    f = open(path)
    length_dict[file] = len(f.readlines())
f.close()

#Name Generator
num_names = 1000
name_indices = random.sample(range(51075960), num_names)
name_indices.sort()
sum = 0
i = -1
for index in name_indices:
    while sum < index:
        i += 1
        f = open(os.path.join("curate", list(length_dict.keys())[i]))
        current = f.readlines()
        f.close()
        sum += list(length_dict.values())[i]
    RIG.store(query=current[sum-index].split(",")[slice(2)], dir="similars", quantity=2, store_keys=True, csv_path="references.csv", value=1)

In [None]:
#Populate Directory with Dissimilars(celebrities)

with open("celebrities.txt") as f:
    celebs = f.readlines()
    f.close()

#Form Random Pairs
pairs = []
num_pairs = 350
name_indices = random.sample(range(len(celebs)), 2*num_pairs)
for i in range(0, num_pairs, 2):
    pairs.append([celebs[name_indices[i]], celebs[name_indices[i+1]]])

for pair in pairs:
    RIG.store_multiple(query_list=pair, dir="similars", quantity=1, store_keys=True, csv_path="references.csv", value=1)

In [None]:
#Shuffle References
with open("references.csv", "r") as f:
    lines = f.readlines()
    f.close()
with open("references.csv", "w") as f:
    random.shuffle(lines)
    f.writelines(lines)
    f.close()

In [None]:
#Organizing Data

dir = "similars"
with open("references.csv") as f:
    lines = f.readlines()
    f.close()
train_targets = []
train_imgs = []
for i in lines[:]:
    splitted = i.split(",")
    train_targets.append(float(splitted[0]))
    img1 = Image.open(f"{dir}/{splitted[1]}.jpg")
    img2 = Image.open(f"{dir}/{splitted[2][:-1]}.jpg")
    imgs = [img1, img2]
    img1 = face(img1)
    img2 = face(img2)
    if img1.size[0] < img_dim:
        img1 = add_margin(img1, 0, (img_dim-img1.size[0])//2, 0, (img_dim-img1.size[0])//2, (0, 0, 0))
    if img1.size[1] < img_dim:
        img1 = add_margin(img1, (img_dim-img1.size[1])//2, 0, (img_dim-img1.size[1])//2, 0, (0, 0, 0))
    if img2.size[0] < img_dim:
        img2 = add_margin(img2, 0, (img_dim-img2.size[0])//2, 0, (img_dim-img2.size[0])//2, (0, 0, 0))
    if img2.size[1] < img_dim:
        img2 = add_margin(img2, (img_dim-img2.size[1])//2, 0, (img_dim-img2.size[1])//2, 0, (0, 0, 0))
    train_imgs.append(couple(trim(np.array(img1)), trim(np.array(img2))))
train_targets = np.array(train_targets)
train_imgs = np.float32(np.array(train_imgs))

In [None]:
#Model Collection

#Think of interchanging some Convolution Layers with Pooling Layers

class Models(Sequential):
    def __init__(self):
        super(Models, self).__init__()
    def reduction(self):
        self.add(Conv2D(10, (img_dim//2, img_dim//2), use_bias=False))
        self.add(Decouple(img_dim//2+1))
        self.add(Conv2D(1, (img_dim//4, img_dim//4), use_bias=False))
        self.add(Decouple(img_dim//4+2))
        self.add(Conv2D(1, (img_dim//4, img_dim//4), use_bias=False))
        self.add(Decouple(3))
        self.add(Conv2D(10, (2, 2), use_bias=False))
        self.add(Decouple(2))
        self.add(Conv2D(1, (2, 2), use_bias=False))
        self.add(Decouple(1))
        self.add(Flatten())
        self.add(Dense(32, use_bias=False))
        self.add(Dense(1, activation=sigmoid, bias_constraint=unitnorm()))

In [None]:
#Training a Model
model = Models()
model.reduction()
model.compile(loss="binary_crossentropy", optimizer="adam")
history = model.fit(x=train_imgs, y=train_targets, epochs=100, batch_size=32, use_multiprocessing=True)

In [None]:
#Validation

img1 = Image.open(f"1.jpg")
img2 = Image.open(f"2.jpg")
img1 = face(img1)
img2 = face(img2)
if img1.size[0] < img_dim:
    img1 = add_margin(img1, 0, (img_dim-img1.size[0])//2, 0, (img_dim-img1.size[0])//2, (0, 0, 0))
if img1.size[1] < img_dim:
    img1 = add_margin(img1, (img_dim-img1.size[1])//2, 0, (img_dim-img1.size[1])//2, 0, (0, 0, 0))
if img2.size[0] < img_dim:
    img2 = add_margin(img2, 0, (img_dim-img2.size[0])//2, 0, (img_dim-img2.size[0])//2, (0, 0, 0))
if img2.size[1] < img_dim:
    img2 = add_margin(img2, (img_dim-img2.size[1])//2, 0, (img_dim-img2.size[1])//2, 0, (0, 0, 0))
datum = couple(trim(np.array(img1)), trim(np.array(img2)))
#Image.fromarray(datum).show()
model.predict(np.array([datum]))

In [None]:
#Save Model Weights
model.save_weights("weights")

In [None]:
#Load Model Weights
'''
model: "reduction", weights: "weights", img_dim=64
model: "reduction_alt", weights: "weights_alt", img_dim=64
'''
model = Models()
model.reduction()
model.build(input_shape=(None, img_dim*2, img_dim, 3))
model.load_weights("weights")