In [None]:
from PIL import Image
import numpy as np
import pandas as pd
import os
import math
import cv2
import tensorflow as tf
import itertools
from sklearn import preprocessing
import pickle
import matplotlib.cm as cm
import random
from sklearn.metrics import f1_score, roc_auc_score
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, confusion_matrix 

In [None]:
train_folder = 'resized_train/'
names = os.listdir(train_folder)
if '.DS_Store' in names:
    names.remove('.DS_Store')

In [None]:
positive_pairs = []
negative_pairs = []
for index, name in enumerate(names):
    copy_names = names.copy()
    copy_names.remove(name)
    print(index)
    new_path = train_folder + name + '/'
    images = os.listdir(new_path)
    
    images = [new_path + s for s in images]
    combinations = itertools.combinations(images, 2)
    
    temp = []
    for comb in combinations:
        temp.append(list(comb))
    
    positive_pairs.extend(random.choices(temp, k = 80))   
    after = len(positive_pairs)
    
    negative_folders = random.sample(copy_names, 80)
    
    neg_imgs = []
    for negative in negative_folders:
        negative_path = train_folder + negative + '/'
        neg_imgs.append(negative_path + random.sample(os.listdir(negative_path), 1)[0])
    
    positions = [0,1]
    
    for each in neg_imgs:
        true = random.sample(images, 1)[0]
        a = [0,0]
        pos = random.sample(positions, 1)
        a[pos[0]] = true
        if pos[0] == 1:
            a[0] = each
        else:
            a[1] = each
        negative_pairs.append(a)
    
print(len(positive_pairs), len(negative_pairs))
       


In [None]:
len(positive_pairs), len(negative_pairs)

In [None]:
labels = []
labels.extend([1] * len(positive_pairs))
labels.extend([0] * len(negative_pairs))
len(labels)

In [None]:
pairs = []
pairs.extend(positive_pairs)
pairs.extend(negative_pairs)
len(pairs)

In [None]:
del positive_pairs
del negative_pairs

In [None]:
df_train = pd.DataFrame(pairs)
df_train.columns = ['img1_name', 'img2_name']
df_train['label'] = labels
print(df_train.head(4))

In [None]:
del pairs

In [None]:
df_train = df_train.sample(frac = 1)
df_train = df_train.reset_index(drop = True)
df_train.head(1)

In [None]:
df_val_only = pd.read_csv('dataset/val.csv') 
print(df_val_only.head(4)), df_val_only.shape


In [None]:
def get_feature(img1, img2, orb, bf):
    kp1, des1 = orb.detectAndCompute(img1,None)
    kp2, des2 = orb.detectAndCompute(img2,None)
    try:
        if des1.all()!=None and des2.all()!=None:
            return des1, des2
            
    except AttributeError:
        return [], []
    except ValueError:
        return [], []
 

In [None]:
def over_df(df, condition):
    empty_indices = []
    empty_labels = []
    orb = cv2.ORB_create(90)
    bf = cv2.BFMatcher()
    img1_features = []
    img2_features = []
    label = []
    errors = []
    if condition == 'train':
        for index, row in df.iterrows():
            print(index)
            img1 = Image.open(row['img1_name'])
            img2 = Image.open(row['img2_name'])

            img1 = np.asarray(img1)
            img2 = np.asarray(img2)

            img1 = np.expand_dims(img1, axis = -1)
            img2 = np.expand_dims(img2, axis = -1)

            m_list, n_list = get_feature(img1, img2, orb, bf)
            if len(m_list) == 0 or len(n_list) == 0:
                continue
            img1_features.append(m_list)
            img2_features.append(n_list)
            label.append(row['label'])
            
    else:
        for index, row in df.iterrows():
            print(index)
            img1 = Image.open('val_resize/' + row['img1_name'])
            img2 = Image.open('val_resize/' + row['img2_name'])

            img1 = np.asarray(img1)
            img2 = np.asarray(img2)

            img1 = np.expand_dims(img1, axis = -1)
            img2 = np.expand_dims(img2, axis = -1)

            m_list, n_list = get_feature(img1, img2, orb, bf)
            if len(m_list) == 0 or len(n_list) <= 1:
                empty_indices.append(index)
                empty_labels.append(row['label'])
                continue
            img1_features.append(m_list)
            img2_features.append(n_list)
            label.append(row['label'])
        return img1_features, img2_features, label, empty_indices, empty_labels
        
    return img1_features, img2_features, label
        
img1_features_train, img2_features_train, label_train = over_df(df_train, 'train')
img1_features_val, img2_features_val, label_val, empty_indices, empty_labels = over_df(df_val_only, 'val')


In [None]:
len(label_val), len(empty_labels), set(empty_labels)


In [None]:
lengths = []
for each in img1_features_train:
    lengths.append(len(each))
maxlen = max(lengths)
maxlen

In [None]:
img1_features_train = tf.keras.utils.pad_sequences(img1_features_train, maxlen = maxlen)
img2_features_train = tf.keras.utils.pad_sequences(img2_features_train, maxlen = maxlen)

img1_features_val = tf.keras.utils.pad_sequences(img1_features_val, maxlen = maxlen)
img2_features_val = tf.keras.utils.pad_sequences(img2_features_val, maxlen = maxlen)


In [None]:
img1_features_train = np.array(img1_features_train)
img2_features_train = np.array(img2_features_train)
label_train = np.array(label_train)

img1_features_val = np.array(img1_features_val)
img2_features_val = np.array(img2_features_val)
label_val = np.array(label_val)

img1_features_train.shape, img2_features_train.shape, label_train.shape, img1_features_val.shape, img2_features_val.shape, label_val.shape


In [None]:
img1_features_train_shape = img1_features_train.shape
img2_features_train_shape = img2_features_train.shape
img1_features_val_shape = img1_features_val.shape
img2_features_val_shape = img2_features_val.shape

img1_features_train = np.reshape(img1_features_train, (img1_features_train.shape[0], img1_features_train.shape[1] * img1_features_train.shape[2]))
img2_features_train = np.reshape(img2_features_train, (img2_features_train.shape[0], img2_features_train.shape[1] * img2_features_train.shape[2]))
img1_features_val = np.reshape(img1_features_val, (img1_features_val.shape[0], img1_features_val.shape[1] * img1_features_val.shape[2]))
img2_features_val = np.reshape(img2_features_val, (img2_features_val.shape[0], img2_features_val.shape[1] * img2_features_val.shape[2]))


In [None]:
mm_scaler_m = preprocessing.StandardScaler()
mm_scaler_m.fit(img1_features_train)
m_train = mm_scaler_m.transform(img1_features_train)
m_val = mm_scaler_m.transform(img1_features_val)

mm_scaler_n = preprocessing.StandardScaler()
mm_scaler_n.fit(img2_features_train)
n_train = mm_scaler_n.transform(img2_features_train)
n_val = mm_scaler_n.transform(img2_features_val)


In [None]:
m_train = np.reshape(m_train, (img1_features_train_shape[0], img1_features_train_shape[1], img1_features_train_shape[2]))
m_val = np.reshape(m_val, (img1_features_val_shape[0], img1_features_val_shape[1], img1_features_val_shape[2]))
n_train = np.reshape(n_train, (img2_features_train_shape[0], img2_features_train_shape[1], img2_features_train_shape[2]))
n_val = np.reshape(n_val, (img2_features_val_shape[0], img2_features_val_shape[1], img2_features_val_shape[2]))


In [None]:
m_train.shape, n_train.shape, label_train.shape, m_val.shape, n_val.shape, label_val.shape


In [None]:

inputs_1 = tf.keras.Input(shape = (maxlen, 32))
inputs_2 = tf.keras.Input(shape = (maxlen, 32))


attention3 = tf.keras.layers.MultiHeadAttention(num_heads = 2, key_dim = 16)
output_tensor = attention3(inputs_1, inputs_2)
output_tensor = tf.keras.layers.Dropout(0.2)(output_tensor)
merge = tf.keras.layers.Add()([inputs_1, output_tensor])
merge = tf.keras.layers.LayerNormalization()(merge)
out = tf.keras.layers.Dense(32, activation = 'relu')(merge)
out = tf.keras.layers.Dense(32, activation = 'relu')(out)
out = tf.keras.layers.Dropout(0.2)(out)
merge = tf.keras.layers.Add()([merge, out])
merge = tf.keras.layers.LayerNormalization()(merge)

merge = tf.keras.layers.GlobalAveragePooling1D()(merge)

y = tf.keras.layers.Dense(64, activation = 'relu', kernel_regularizer = tf.keras.regularizers.L2(l2=0.01))(merge)
y = tf.keras.layers.Dropout(0.3)(y)
y = tf.keras.layers.Dense(1, activation = 'sigmoid')(y)


model = tf.keras.Model([inputs_1, inputs_2], y)
print(model.summary())
def step_decay(epoch):
    initial_lrate = 0.0005
    drop = 0.09
    epochs_drop = 10.0
    lrate = initial_lrate * math.pow(drop, math.floor((1+epoch)/epochs_drop))
    return lrate

lrate = tf.keras.callbacks.LearningRateScheduler(step_decay)
callbacks_list = [lrate]
    
adam = tf.keras.optimizers.legacy.Adam(learning_rate=0.0005, amsgrad=False)

model.compile(loss = 'binary_crossentropy', metrics = ['accuracy'], optimizer = adam)
model.fit([m_train, n_train], label_train, epochs = 100, batch_size = 256, 
          validation_data = ([m_val, n_val], label_val), callbacks=callbacks_list)


In [None]:
def make_predictions(df):
    orb = cv2.ORB_create(90)
    bf = cv2.BFMatcher()
    img1_features = []
    img2_features = []
    label = []
    predictions = []
    img1_name = []
    img2_name = []
    prediction_probs = []
    
    for index, row in df.iterrows():
        print(index)
        img1_name.append(row['img1_name'])
        img2_name.append(row['img2_name'])
        img1 = Image.open('val_resize/' + row['img1_name'])
        img2 = Image.open('val_resize/' + row['img2_name'])

        img1 = np.asarray(img1)
        img2 = np.asarray(img2)

        img1 = np.expand_dims(img1, axis = -1)
        img2 = np.expand_dims(img2, axis = -1)

        m_list, n_list = get_feature(img1, img2, orb, bf)
        if len(m_list) == 0 or len(n_list) <= 1:
            temp_prob = np.random.rand()
            prediction_probs.append(temp_prob)
            if temp_prob >= 0.5:
                predictions.append(1)
            else:
                predictions.append(0)
            label.append(row['label'])
            
        else:
            img1_features_val = []
            img2_features_val = []
            img1_features_val.append(m_list)
            img2_features_val.append(n_list)
            img1_features_val = tf.keras.utils.pad_sequences(img1_features_val, maxlen = maxlen)
            img2_features_val = tf.keras.utils.pad_sequences(img2_features_val, maxlen = maxlen)

            img1_features_val = np.array(img1_features_val)
            img2_features_val = np.array(img2_features_val)

            img1_features_val_shape = img1_features_val.shape
            img2_features_val_shape = img2_features_val.shape

            img1_features_val = np.reshape(img1_features_val, (img1_features_val.shape[0], img1_features_val.shape[1] * img1_features_val.shape[2]))
            img2_features_val = np.reshape(img2_features_val, (img2_features_val.shape[0], img2_features_val.shape[1] * img2_features_val.shape[2]))

            m_val = mm_scaler_m.transform(img1_features_val)
            n_val = mm_scaler_n.transform(img2_features_val)

            m_val = np.reshape(m_val, (img1_features_val_shape[0], img1_features_val_shape[1], img1_features_val_shape[2]))
            n_val = np.reshape(n_val, (img2_features_val_shape[0], img2_features_val_shape[1], img2_features_val_shape[2]))

            model_pred = model.predict([m_val, n_val])
            prediction_probs.append(model_pred[0][0])
            if model_pred >= 0.5:
                predictions.append(1)
            else:
                predictions.append(0)
            label.append(row['label'])
          
    return prediction_probs, predictions, label, img1_name, img2_name
        
prediction_probs, predictions, label, img1_name, img2_name = make_predictions(df_val_only)


In [None]:
confusion_matrix(label, predictions)


In [None]:
f1_score(label, predictions)


In [None]:
roc_auc_score(label, prediction_probs)


In [None]:
model.save("writer_model.h5")
model.save_weights('writer_model_weights.h5')

In [None]:
with open('mm_scaler_m.pkl','wb') as f:
    pickle.dump(mm_scaler_m, f)
with open('mm_scaler_n.pkl','wb') as f:
    pickle.dump(mm_scaler_n, f)