https://github.com/isakbosman/CS271P/blob/main/nbs/CS274P_Lab_3_Neural%20Network%20.ipynb

# Environment

In [None]:
# from google.colab import drive
# drive.mount('/content/gdrive')

In [None]:
# %cd /content/gdrive/MyDrive/cs271p/data/

In [None]:
# !pip install wandb
!pip install openpyxl


In [None]:
import random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from PIL import Image
import glob
from tqdm import tqdm
from tensorflow import keras
from tensorflow.keras.layers import *
from tensorflow.keras.preprocessing import image
from tensorflow.keras.mixed_precision import experimental as mixed_precision


# import wandb


In [None]:
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_policy(policy)

# Data Prepare

In [None]:
train_df = pd.read_excel('../input/cs273p-data/train-pairs-updated.xlsx')
val_df = pd.read_excel('../input/cs273p-data/val-pairs-updated.xlsx')

In [None]:
train_df = train_df[['p1','p2','ptype']]
val_df = val_df[['p1','p2','ptype']]

In [None]:
train_df = train_df.loc[train_df["ptype"] != 'sibs' ].reset_index(drop=True)
val_df = val_df.loc[val_df["ptype"] != 'sib' ].reset_index(drop=True)

In [None]:
val_df

In [None]:
label = pd.unique(train_df['ptype'])
label

In [None]:
pd.unique(val_df['ptype'])

In [None]:
label_to_index = dict((name, index) for index, name in enumerate(label))
label_to_index

In [None]:
# label_to_index["sib"] = 6

In [None]:
train_length = len(train_df)
val_length = len(val_df)

In [None]:
train_list1 = []
train_list2 = []
train_label = []
val_list1 = []
val_list2 = []
val_label = []

In [None]:
for num in tqdm(range(train_length)):
    train_path1 = "../input/cs273p-data/train-faces/train-faces/"+train_df["p1"][num]+"/*.jpg"
    train_path2 = "../input/cs273p-data/train-faces/train-faces/"+train_df["p2"][num]+"/*.jpg"
    for filename1 in glob.glob(train_path1):
        for filename2 in glob.glob(train_path2):
            train_list1.append(filename1)
            train_list2.append(filename2)
            train_label.append(label_to_index.get(train_df["ptype"][num]))

In [None]:
def get_family(s):
    lst = s.split("/")
    return lst[5]

In [None]:
for num in tqdm(range(val_length)):
    val_path1 = "../input/cs273p-data/val-faces/val-faces/"+val_df["p1"][num]+"/*.jpg"
    val_path2 = "../input/cs273p-data/val-faces/val-faces/"+val_df["p2"][num]+"/*.jpg"
    for filename1 in glob.glob(val_path1):
        for filename2 in glob.glob(val_path2):
            val_list1.append(filename1)
            val_list2.append(filename2)
            val_label.append(label_to_index.get(val_df["ptype"][num]))

In [None]:
val_label[0]

In [None]:
df_label = pd.DataFrame(train_label, columns=['label'])
df_label = df_label.groupby(by=['label']).size()
df_label

In [None]:
df_val_label = pd.DataFrame(val_label, columns=['label'])
df_val_label = df_val_label.groupby(by=['label']).size()
df_val_label

In [None]:
length = len(train_label)

https://keras.io/examples/vision/siamese_network/#putting-everything-together

https://colab.research.google.com/github/keras-team/keras-io/blob/master/examples/vision/ipynb/siamese_network.ipynb#scrollTo=KSU61vgnB7Z6

In [None]:
def preprocess_image(filename):
    """
    Load the specified file as a JPEG image, preprocess it and
    resize it to the target shape.
    """
    image_string = tf.io.read_file(filename)
    image = tf.io.decode_jpeg(image_string,channels=1)
    image = tf.image.resize(image,[108,124])
    image /= 255.0
    return image


def preprocess(anchor,positive,label):
    """
    Given the filenames corresponding to the three images, load and
    preprocess them.
    """

    return (tf.concat([preprocess_image(anchor),
        preprocess_image(positive)],2),label
    )


In [None]:
BATCH_SIZE = 128

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_list1,train_list2,train_label))
val_dataset = tf.data.Dataset.from_tensor_slices((val_list1,val_list2,val_label))

In [None]:
train_dataset = train_dataset.shuffle(len(train_label),seed = 50)
val_dataset = val_dataset.shuffle(len(val_label),seed = 50)

In [None]:
list(val_dataset.take(1).as_numpy_iterator())[0]

In [None]:
train_dataset = train_dataset.map(preprocess)
val_dataset = val_dataset.map(preprocess)

In [None]:
val_dataset

In [None]:
# list(val_dataset.take(1).as_numpy_iterator())[0]

In [None]:
train_dataset = train_dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
val_dataset = val_dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

# Base Line Model(InceptionResNetV2)

In [None]:
dummy_model = tf.keras.applications.InceptionResNetV2(
    include_top=False,
    weights=('imagenet'),
    input_shape=(108,124,3),
    classes=10
)

In [None]:
def gray_weights(weights):
    for r in range(len(weights)):
        for c in range(len(weights[r])):
            weights[r][c] = np.average(weights[r][c], axis = 0)
    return weights

def get_model_len(model):
    num = 0
    for i,layer in enumerate(model.layers):
        num = max(num,i)
    return num+1

In [None]:
model_temp = tf.keras.applications.InceptionResNetV2(
    include_top=False,
    weights=None,
    input_shape=(108,124,2),
    classes=10
)

In [None]:
dummy_model.layers[1].get_weights()[0][0][0]

In [None]:
model_temp.layers[1].get_weights()[0][0][0]

In [None]:
for i,layer in enumerate(model_temp.layers):
    if (i == 1):
        weights = dummy_model.get_layer(index=i).get_weights()[0]
#         bias = dummy_model.get_layer(index=i).get_weights()[1]
        weights = gray_weights(weights)
        layer.set_weights([weights[:,:,-2:,:]])
    if(i>1 and i < get_model_len(dummy_model)):
        if (dummy_model.get_layer(index=i).get_weights()!=[]):
            weights = dummy_model.get_layer(index=i).get_weights()
#             bias = dummy_model.get_layer(index=i).get_weights()[1]
            layer.set_weights(weights)

In [None]:
model_temp.layers[1].get_weights()[0][0][0]

In [None]:

model = keras.Sequential([
  model_temp,
  GlobalAveragePooling2D(),
  Dropout(0.8),
  Dense(10,activation='softmax',dtype='float32', name='predictions')
])
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
              metrics=['accuracy'])

In [None]:
model.summary()

In [None]:
early_stop = keras.callbacks.EarlyStopping(monitor='val_loss',patience=10,restore_best_weights=True)

In [None]:
model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath="./InceptionResNetV2.h5",
    save_weights_only=False,
    monitor='val_loss',
    mode='min',
    save_best_only=True)

In [None]:
# wandb.init()

In [None]:
baseline_model = model
history = baseline_model.fit(train_dataset,epochs=1000,validation_data=val_dataset,callbacks=[early_stop,model_checkpoint])

In [None]:
def plot_model(history):
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    epochs_range = range(len(history.history['accuracy']))
    plt.figure(figsize=(8, 8))
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, acc, label='Training Accuracy')
    plt.plot(epochs_range, val_acc, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')
    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, loss, label='Training Loss')
    plt.plot(epochs_range, val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')
    plt.show()

In [None]:
def cm(model):
    predictions = model.predict(val_dataset)
    pred = []
    for x in predictions:
        pred.append(np.argmax(x))
    confusion = tf.math.confusion_matrix(labels=val_label, predictions=pred)
    print(confusion)

In [None]:
plot_model(history)
cm(baseline_model)

# Weighted Model

In [None]:
df_label

In [None]:
class_weight = {}

In [None]:
for i in range(10):
    class_weight[i] = (1/df_label.xs(i))*(length/10.0)

In [None]:
weighted_model = model
weighted_history = weighted_model.fit(train_dataset,epochs=1000,validation_data=val_dataset,callbacks=[early_stop,model_checkpoint],class_weight=class_weight)

In [None]:
plot_model(weighted_history)
cm(weighted_model)

# Oversampling

In [None]:
df_oversample =  pd.DataFrame(list(zip(train_list1, train_list2,train_label)),
               columns =['path1', 'path2','label'])
df_oversample

In [None]:
len(train_label)

In [None]:
class_lst = []
for i in range(10):
    class_lst.append(df_oversample[df_oversample['label'] == i].reset_index(drop=True))
    if i != 0:
        class_lst[i] = class_lst[i].sample(len(class_lst[0]), replace=True)


In [None]:
for i,class_ in enumerate(class_lst):
    if i == 0:
        ds_oversample = tf.data.Dataset.from_tensor_slices((class_["path1"],class_["path2"],class_["label"]))
    else:
        ds_oversample = ds_oversample.concatenate(tf.data.Dataset.from_tensor_slices((class_["path1"],class_["path2"],class_["label"])))


In [None]:
# len(list(train_ds_oversample))

In [None]:
train_ds_oversample = ds_oversample.shuffle(10*df_label.xs(0),seed = 50)
train_ds_oversample = train_ds_oversample.map(preprocess)
train_ds_oversample = train_ds_oversample.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

In [None]:
train_ds_oversample

In [None]:
oversample_model = model
oversample_history = oversample_model.fit(train_ds_oversample,epochs=1000,validation_data=val_dataset,callbacks=[early_stop,model_checkpoint])

In [None]:
plot_model(oversample_history)
cm(oversample_model)

# Augumentation

In [None]:
augmentation_model = tf.keras.Sequential([
  RandomFlip("horizontal"),
  RandomRotation(0.06),
  model
])

In [None]:
augmentation_model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
              metrics=['accuracy'])

In [None]:
augmentation_history = augmentation_model.fit(train_ds_oversample,epochs=1000,validation_data=val_dataset,callbacks=[early_stop,model_checkpoint])

In [None]:
plot_model(augmentation_history)
cm(augmentation_model)

# Siamese Network —— Another approach

In [None]:
def siamese_preprocess(anchor,positive,ans):
    """
    Given the filenames corresponding to the three images, load and
    preprocess them.
    """

    return ((preprocess_image(anchor),preprocess_image(positive)),ans)


In [None]:
train_ds_siamese = ds_oversample.shuffle(10*df_label.xs(0),seed = 50)
train_ds_siamese = train_ds_siamese.map(siamese_preprocess)
train_ds_siamese = train_ds_siamese.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

In [None]:
siamese_val_dataset = tf.data.Dataset.from_tensor_slices((val_list1,val_list2,val_label))
siamese_val_dataset = siamese_val_dataset.shuffle(len(val_label),seed = 50)
siamese_val_dataset = siamese_val_dataset.map(siamese_preprocess)
siamese_val_dataset = siamese_val_dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

In [None]:
model_siamese = tf.keras.applications.InceptionResNetV2(
    include_top=False,
    weights=None,
    input_shape=(108,124,1),
    classes=10
)

In [None]:
for i,layer in enumerate(model_siamese.layers):
    if (i == 1):
        weights = dummy_model.get_layer(index=i).get_weights()[0]
#         bias = dummy_model.get_layer(index=i).get_weights()[1]
        weights = gray_weights(weights)
        layer.set_weights([weights[:,:,-1:,:]])
    if(i>1 and i < get_model_len(dummy_model)):
        if (dummy_model.get_layer(index=i).get_weights()!=[]):
            weights = dummy_model.get_layer(index=i).get_weights()
#             bias = dummy_model.get_layer(index=i).get_weights()[1]
            layer.set_weights(weights)

In [None]:
model_siamese = tf.keras.Sequential([
  RandomFlip("horizontal"),
  RandomRotation(0.06),
  model_siamese
])

In [None]:
input_1 =Input((108,124,1))
input_2 = Input((108,124,1))

In [None]:
concat = Concatenate()(
    [model_siamese(input_1),
    model_siamese(input_2)]
)

In [None]:
pool = GlobalAveragePooling2D()(concat)
drop = Dropout(0.8)(pool)
oputput = Dense(10,activation='softmax',dtype='float32', name='predictions')(drop)

In [None]:
siamese_network = tf.keras.Model(
    inputs=[input_1, input_2], outputs=oputput
)

In [None]:
siamese_network.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
              metrics=['accuracy'])

In [None]:
siamese_network.summary()

In [None]:
siamese_history = siamese_network.fit(train_ds_siamese,validation_data=siamese_val_dataset,epochs=1000, callbacks=[early_stop,model_checkpoint])