In [None]:
!pip install numpy matplotlib scikit-image tensorflow -q

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from skimage.io import imread
from skimage.transform import resize
from tensorflow.keras.applications.resnet50 import preprocess_input

In [None]:
train_faces_path='data/final-train-faces/'
train_faces_excel='data/train-set-pairs.xlsx'

validate_faces_path='data/final-validation-faces/'
validate_faces_excel='data/val-set-pairs.xlsx'

test_faces_path='data/final-test-faces/'
test_faces_excel='data/test-set-pairs.xlsx'

# possible values: (concatenation, add_subtract, add_subtract_multiply, squared_difference_squared_sum, squared_difference_squared_sum_multiply)
fusion_type = 'concatenation'

# possible values: (concatenation: 4096, add_subtract: 4096, add_subtract_multiply: 6144, squared_difference_squared_sum: 4096, squared_difference_squared_sum_multiply: 6144)
fusion_input_dim = 4096

ignore_ptypes = ['gmgs', 'gfgs', 'gfgd', 'gmgd']

calculate_features = True

In [None]:
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))


In [None]:
import pandas as pd 
from IPython.display import Image, display
from tqdm import tqdm

train_df=pd.read_excel(train_faces_excel)
train_df=train_df[['p1','p2','ptype','nsamples']]

val_df=pd.read_excel(validate_faces_excel)
val_df=val_df[['p1','p2','ptype','nsamples']]


test_df=pd.read_excel(test_faces_excel)
test_df=test_df[['p1','p2','ptype','nsamples']]

print(f'Train size bevore cleanup: {len(train_df)}')
print(f'Val size bevore cleanup: {len(val_df)}')
print(f'Test size bevore cleanup: {len(test_df)}')

# Filter out rows with specified ptype values
train_df = train_df[~train_df['ptype'].isin(ignore_ptypes)]
val_df = val_df[~val_df['ptype'].isin(ignore_ptypes)]
test_df = test_df[~test_df['ptype'].isin(ignore_ptypes)]

print(f'Train size after cleanup: {len(train_df)}')
print(f'Val size after cleanup: {len(val_df)}')
print(f'Test size after cleanup: {len(test_df)}')


train_df

In [None]:
import wandb
wandb.login()

# Enrich face image pairs

In [None]:
import pandas as pd
import os
from tqdm import tqdm
def enrich_face_image_pairs(path, df):
    new_pair = pd.DataFrame(columns=['p1_path', 'p2_path', 'ptype', 'tag'])
    index = 0
    tag = 0
    for i in tqdm(range(len(df))):
        p1 = df.iloc[i]['p1']
        p2 = df.iloc[i]['p2']
        ptype = df.iloc[i]['ptype']
        for p1_path in os.listdir(path + p1):
            for p2_path in os.listdir(path + p2):
                new_pair.loc[index] = [p1 + '/' + p1_path, p2 + '/' + p2_path, ptype, tag]
                index += 1
                tag += 1
    return new_pair

if calculate_features:
    train_new_pair = enrich_face_image_pairs(train_faces_path, train_df)
    val_new_pair = enrich_face_image_pairs(validate_faces_path, val_df)
    test_new_pair = enrich_face_image_pairs(test_faces_path, test_df)


In [None]:
if calculate_features:
    tqdm.pandas(desc='Processing:')
    train_new_pair['p1_path']=train_new_pair.progress_apply(lambda x: train_faces_path+x['p1_path'], axis=1)
    train_new_pair['p2_path']=train_new_pair.progress_apply(lambda x: train_faces_path+x['p2_path'], axis=1)

    val_new_pair['p1_path']=val_new_pair.progress_apply(lambda x: validate_faces_path+x['p1_path'], axis=1)
    val_new_pair['p2_path']=val_new_pair.progress_apply(lambda x: validate_faces_path+x['p2_path'], axis=1)

    test_new_pair['p1_path']=test_new_pair.progress_apply(lambda x: test_faces_path+x['p1_path'], axis=1)
    test_new_pair['p2_path']=test_new_pair.progress_apply(lambda x: test_faces_path+x['p2_path'], axis=1)

    print(train_new_pair)
    print(val_new_pair)
    print(test_new_pair)

In [None]:
if calculate_features:
    train_df['nsamples'].sum()
    val_df['nsamples'].sum()
    test_df['nsamples'].sum()

In [None]:
if calculate_features:
    train_new_pair.to_csv(f'{fusion_type}_full_train_pairs.csv')
    val_new_pair.to_csv(f'{fusion_type}_full_val_pairs.csv')
    test_new_pair.to_csv(f'{fusion_type}_full_test_pairs.csv')

# Feature Extraction

In [None]:
from tensorflow.keras.applications.resnet50 import ResNet50
model = ResNet50(weights='imagenet')
model.summary()

In [None]:
def create_base_network(input_shape):
    base_model = ResNet50(weights='imagenet', include_top=False, pooling='avg', input_shape=input_shape)
    return base_model

input_shape = (224, 224, 3)
base_network = create_base_network(input_shape)
base_network.summary()

In [None]:
def feature_concatenation(x, y):
    return np.concatenate((x, y), axis=-1)

def feature_add_subtract(x, y):
    return np.concatenate((x + y, x - y), axis=-1)

def feature_add_subtract_multiply(x, y):
    return np.concatenate((x + y, x - y, x * y), axis=-1)

def feature_squared_difference_squared_sum(x, y):
    return np.concatenate((x**2 - y**2, (x - y)**2), axis=-1)

def feature_squared_difference_squared_sum_multiply(x, y):
    return np.concatenate((x**2 - y**2, (x - y)**2, x * y), axis=-1)

In [None]:
def cal_feature(p_path, model):
    image = imread(p_path)
    image_224 = resize(image, (224, 224), preserve_range=True, mode='reflect')
    image_224_batch = np.expand_dims(image_224, axis=0)
    preprocessed_batch = preprocess_input(image_224_batch)
    feature_arr = model.predict(preprocessed_batch)
    return feature_arr

In [None]:
def fuse_features(feature_x, feature_y, fusion_type='concatenation'):
    if fusion_type == 'concatenation':
        return feature_concatenation(feature_x, feature_y)
    elif fusion_type == 'add_subtract':
        return feature_add_subtract(feature_x, feature_y)
    elif fusion_type == 'add_subtract_multiply':
        return feature_add_subtract_multiply(feature_x, feature_y)
    elif fusion_type == 'squared_difference_squared_sum':
        return feature_squared_difference_squared_sum(feature_x, feature_y)
    elif fusion_type == 'squared_difference_squared_sum_multiply':
        return feature_squared_difference_squared_sum_multiply(feature_x, feature_y)
    else:
        raise ValueError("Invalid fusion type")

In [None]:
from tensorflow.keras.layers import Dense

def calculate_similarity(p_path1, p_path2, model, fusion_type='concatenation'):
    feature_x = cal_feature(p_path1, model)
    feature_y = cal_feature(p_path2, model)
    fused_features = fuse_features(feature_x, feature_y, fusion_type=fusion_type)
    similarity_score = compute_similarity(fused_features)
    return similarity_score

def compute_similarity(fused_features):
    fc1 = Dense(128, activation='relu')(fused_features)
    fc2 = Dense(1, activation='sigmoid')(fc1)
    return fc2

In [None]:
if calculate_features:
    train_lis=[]
    for f in os.listdir(train_faces_path):
        for mid in os.listdir(train_faces_path+f):
            if 'MID' in mid:
                for image in os.listdir(train_faces_path+f+'/'+mid):
                    train_lis.append(train_faces_path+f+'/'+mid+'/'+image)

    val_lis=[]
    for f in os.listdir(validate_faces_path):
        for mid in os.listdir(validate_faces_path+f):
            if 'MID' in mid:
                for image in os.listdir(validate_faces_path+f+'/'+mid):
                    val_lis.append(validate_faces_path+f+'/'+mid+'/'+image)

    test_lis=[]
    for f in os.listdir(test_faces_path):
        for mid in os.listdir(test_faces_path+f):
            if 'MID' in mid:
                for image in os.listdir(test_faces_path+f+'/'+mid):
                    test_lis.append(test_faces_path+f+'/'+mid+'/'+image)
    
    print(len(train_lis))
    print(train_lis[0])

    print(len(val_lis))
    print(val_lis[0])

    print(len(test_lis))
    print(test_lis[0])


In [None]:
if calculate_features:
    train_dic={}
    for image in tqdm(train_lis):
        train_dic[image]=cal_feature(image, base_network)

    val_dic={}
    for image in tqdm(val_lis):
        val_dic[image]=cal_feature(image, base_network)

    test_dic={}
    for image in tqdm(test_lis):
        test_dic[image]=cal_feature(image, base_network)

In [None]:
if calculate_features:
    # Apply feature extraction to dataframe
    train_new_pair['p1_feature'] = train_new_pair.apply(lambda x: train_dic[x['p1_path']], axis=1)
    train_new_pair['p2_feature'] = train_new_pair.apply(lambda x: train_dic[x['p2_path']], axis=1)
    val_new_pair['p1_feature'] = val_new_pair.apply(lambda x: val_dic[x['p1_path']], axis=1)
    val_new_pair['p2_feature'] = val_new_pair.apply(lambda x: val_dic[x['p2_path']], axis=1)
    test_new_pair['p1_feature'] = test_new_pair.apply(lambda x: test_dic[x['p1_path']], axis=1)
    test_new_pair['p2_feature'] = test_new_pair.apply(lambda x: test_dic[x['p2_path']], axis=1)

    train_new_pair['feature_distance'] = train_new_pair.apply(lambda x: fuse_features(x['p1_feature'], x['p2_feature'], fusion_type), axis=1)
    val_new_pair['feature_distance'] = val_new_pair.apply(lambda x: fuse_features(x['p1_feature'], x['p2_feature'], fusion_type), axis=1)
    test_new_pair['feature_distance'] = test_new_pair.apply(lambda x: fuse_features(x['p1_feature'], x['p2_feature'], fusion_type), axis=1)


In [None]:
if calculate_features:
    tqdm.pandas(desc='Processing:')
    train_new_pair['p1_feature']=train_new_pair.progress_apply(lambda x: train_dic[x['p1_path']], axis=1)
    train_new_pair['p2_feature']=train_new_pair.progress_apply(lambda x: train_dic[x['p2_path']], axis=1)
    train_new_pair['feature_distance']=train_new_pair.progress_apply(lambda x: np.abs(x['p1_feature']-x['p2_feature']), axis=1)

    val_new_pair['p1_feature']=val_new_pair.progress_apply(lambda x: val_dic[x['p1_path']], axis=1)
    val_new_pair['p2_feature']=val_new_pair.progress_apply(lambda x: val_dic[x['p2_path']], axis=1)
    val_new_pair['feature_distance']=val_new_pair.progress_apply(lambda x: np.abs(x['p1_feature']-x['p2_feature']), axis=1)

    test_new_pair['p1_feature']=test_new_pair.progress_apply(lambda x: test_dic[x['p1_path']], axis=1)
    test_new_pair['p2_feature']=test_new_pair.progress_apply(lambda x: test_dic[x['p2_path']], axis=1)
    test_new_pair['feature_distance']=test_new_pair.progress_apply(lambda x: np.abs(x['p1_feature']-x['p2_feature']), axis=1)

In [None]:
if calculate_features:
    print(train_new_pair)
    print(val_new_pair)
    print(test_new_pair)

In [None]:
if calculate_features:
    # train_new_pair.to_csv(f'{fusion_type}_full_train_pairs.csv.gz', compression='gzip')
    train_ptype_arr = train_new_pair['ptype'].values
    train_distance = train_new_pair['feature_distance'].values

    # val_new_pair.to_csv(f'{fusion_type}_full_val_pairs.csv.gz', compression='gzip')
    val_ptype_arr = val_new_pair['ptype'].values
    val_distance = val_new_pair['feature_distance'].values

    # test_new_pair.to_csv(f'{fusion_type}_full_test_pairs.csv.gz', compression='gzip')
    test_ptype_arr = test_new_pair['ptype'].values
    test_distance = test_new_pair['feature_distance'].values

In [None]:
def process_distance_list(distance_list, list_name, chunk_size, expected_shape):
    length = len(distance_list)
    print(f"Length of {list_name}: {length}")
    num_chunks = length // chunk_size
    print(f"Number of full chunks for {list_name}: {num_chunks}")

    result_list = []
    for i in tqdm(range(num_chunks)):
        start_idx = i * chunk_size
        end_idx = (i + 1) * chunk_size
        distance_slice = distance_list[start_idx:end_idx]
        if len(distance_slice) == chunk_size:
            dis_arr = np.vstack(distance_slice)
            if dis_arr.shape == expected_shape:
                result_list.append(dis_arr)
            else:
                print(f"Skipping iteration {i} for {list_name} due to shape mismatch: {dis_arr.shape}")
        else:
            print(f"Skipping iteration {i} for {list_name} due to incorrect slice length: {len(distance_slice)}")
    
    # Handle any remaining elements
    remaining_start_idx = num_chunks * chunk_size
    remaining_distance_slice = distance_list[remaining_start_idx:]
    if len(remaining_distance_slice) > 0:
        dis_arr = np.vstack(remaining_distance_slice)
        if dis_arr.shape[1] == expected_shape[1]:  # Check only the number of columns
            result_list.append(dis_arr)
        else:
            print(f"Skipping remaining elements for {list_name} due to shape mismatch: {dis_arr.shape}")
    
    if result_list:
        combined_dis_arr = np.vstack(result_list)
        print(f"Final {list_name} shape: {combined_dis_arr.shape}")
        return combined_dis_arr
    else:
        print(f"No valid arrays to concatenate for {list_name}")
        return None

if calculate_features:
    chunk_size = 4409  # Dynamic chunk size
    expected_shape = (chunk_size, 2048 )

    train_dis_arr = process_distance_list(train_distance, "train_distance", chunk_size, expected_shape)
    val_dis_arr = process_distance_list(val_distance, "val_distance", chunk_size, expected_shape)
    test_dis_arr = process_distance_list(test_distance, "test_distance", chunk_size, expected_shape)

    # Print the shapes of the final arrays
    if train_dis_arr is not None:
        print(f"train_dis_arr.shape: {train_dis_arr.shape}")
    if val_dis_arr is not None:
        print(f"val_dis_arr.shape: {val_dis_arr.shape}")
    if test_dis_arr is not None:
        print(f"test_dis_arr.shape: {test_dis_arr.shape}")


In [None]:
if calculate_features:
    print(train_dis_arr.shape, train_ptype_arr.shape)
    print(val_dis_arr.shape, val_ptype_arr.shape)
    print(test_dis_arr.shape, test_ptype_arr.shape)

In [None]:
if calculate_features:
    np.save(f'{fusion_type}_train_dis_arr.npy', train_dis_arr, allow_pickle=True)
    np.save(f'{fusion_type}_train_ptype_arr.npy', train_ptype_arr, allow_pickle=True)

    np.save(f'{fusion_type}_val_dis_arr.npy', val_dis_arr, allow_pickle=True)
    np.save(f'{fusion_type}_val_ptype_arr.npy', val_ptype_arr, allow_pickle=True)


    np.save(f'{fusion_type}_test_dis_arr.npy', test_dis_arr, allow_pickle=True)
    np.save(f'{fusion_type}_test_ptype_arr.npy', test_ptype_arr, allow_pickle=True)
    pass
else:
    # Load the saved files
    if os.path.exists(f'{fusion_type}_train_dis_arr.npy'):
        train_dis_arr = np.load(f'{fusion_type}_train_dis_arr.npy')
    else:
        raise FileNotFoundError(f"{fusion_type}_train_dis_arr.npy not found.")
    
    if os.path.exists(f'{fusion_type}_train_ptype_arr.npy'):
        train_ptype_arr = np.load(f'{fusion_type}_train_ptype_arr.npy')
    else:
        raise FileNotFoundError(f"{fusion_type}_train_ptype_arr.npy not found.")
    
    if os.path.exists(f'{fusion_type}_val_dis_arr.npy'):
        val_dis_arr = np.load(f'{fusion_type}_val_dis_arr.npy')
    else:
        raise FileNotFoundError(f"{fusion_type}_val_dis_arr.npy not found.")
    
    if os.path.exists(f'{fusion_type}_val_ptype_arr.npy'):
        val_ptype_arr = np.load(f'{fusion_type}_val_ptype_arr.npy')
    else:
        raise FileNotFoundError(f"{fusion_type}_val_ptype_arr.npy not found.")
    
    if os.path.exists(f'{fusion_type}_test_dis_arr.npy'):
        test_dis_arr = np.load(f'{fusion_type}_test_dis_arr.npy')
    else:
        raise FileNotFoundError(f"{fusion_type}_test_dis_arr.npy not found.")
    
    if os.path.exists(f'{fusion_type}_test_ptype_arr.npy'):
        test_ptype_arr = np.load(f'{fusion_type}_test_ptype_arr.npy')
    else:
        raise FileNotFoundError(f"{fusion_type}_test_ptype_arr.npy not found.")

# One Hot Encoding

In [None]:
def switch_label(label):
    return{
        'ms':0,
        'fs':1,
        'bb':2,
        'sibs':3,
        'fd':4,
        'md':5,
        'ss':6,
        'gfgs':7,
        'gfgd':8,
        'gmgs':9,
        'gmgd':10
    }.get(label)

In [None]:
train_ptype_arr

In [None]:
train_label_switched=np.array(list(map(switch_label,train_ptype_arr)))
val_label_switched=np.array(list(map(switch_label,val_ptype_arr)))

train_label_switched

In [None]:
from tensorflow.keras.utils import to_categorical
y_train1=to_categorical(train_label_switched)
y_val1=to_categorical(val_label_switched)
y_train1

In [None]:
def pre2label(pred):
    l=pred.tolist()
    index=l.index(max(l))
    return{
        0:'ms',
        1:'fs',
        2:'bb',
        3:'sibs',
        4:'fd',
        5:'md',
        6:'ss',
        7:'gfgs',
        8:'gfgd',
        9:'gmgs',
        10:'gmgd'
    }.get(index)

In [None]:
train_new_pair['ptype'].value_counts()

In [None]:
train_label_switched=np.array(list(map(switch_label,train_ptype_arr)))
val_label_switched=np.array(list(map(switch_label,val_ptype_arr)))
train_label_switched

In [None]:
from tensorflow.keras.utils import to_categorical
y_train1=to_categorical(train_label_switched)
y_val1=to_categorical(val_label_switched)
y_train1

# Classification Model

In [None]:
import wandb
from wandb.integration.keras import WandbCallback
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.callbacks import EarlyStopping

# Define the sweep configuration
sweep_config = {
    'method': 'grid',  # 'grid', 'random', or 'bayes'
    'metric': {
        'name': 'val_accuracy',
        'goal': 'maximize'
    },
    'parameters': {
        'learning_rate': {
            'values': [0.001, 0.002, 0.005]
        },
        'batch_size': {
            'values': [16, 32, 64]
        },
        'decay': {
            'values': [1e-6, 1e-5, 1e-4]
        },
        'momentum': {
            'values': [0.8, 0.9, 0.95]
        },
        'train_data': {
            'values': [train_faces_path]
        },
        'fusion_type': {
            'values': [fusion_type]
        }
    }
}

# Initialize the sweep
sweep_id = wandb.sweep(sweep_config, project="DSPRO2")

def train():
    # Initialize a new W&B run
    wandb.init()
    config = wandb.config

    model = Sequential()
    model.add(Dense(100, activation='relu', input_dim=fusion_input_dim))
    model.add(Dense(100, activation='tanh'))
    model.add(Dense(y_train1.shape[1], activation='sigmoid'))

    sgd = SGD(learning_rate=config.learning_rate, decay=config.decay, momentum=config.momentum, nesterov=True)
    model.compile(optimizer=sgd, loss='categorical_crossentropy', metrics=['accuracy'])

    # Define the EarlyStopping callback
    early_stopping = EarlyStopping(monitor='loss', patience=5, restore_best_weights=True)

    # Fit the model with the WandbCallback and EarlyStopping
    model.fit(train_dis_arr, y_train1, validation_data=(val_dis_arr, y_val1), verbose=2, epochs=500,
              batch_size=config.batch_size, callbacks=[WandbCallback(), early_stopping])
    wandb.finish()

# Run the sweep
wandb.agent(sweep_id, function=train, count=1)
wandb.finish()

In [None]:
preds = model.predict(test_dis_arr)
preds

In [None]:
preds_label=np.array(list(map(pre2label,preds)))
from sklearn.metrics import accuracy_score
accuracy_score(test_ptype_arr, preds_label) 

In [None]:
df4=pd.DataFrame(columns=['true_label','predict_label','result'])
df4['true_label']=val_ptype_arr
df4['predict_label']=preds_label
df4['result']=df4.apply(lambda x: x['true_label']==x['predict_label'], axis=1)
df4

In [None]:
acc_list=[]
for kinship in df4['true_label'].unique():
    subdf=df4[df4['true_label']==kinship]
    accuracy=subdf['result'].sum()/len(subdf)
    acc_list.append(accuracy)
    print('Accuracy of {} is {}'.format(kinship,accuracy))

In [None]:
df5=pd.DataFrame(columns=['kinship','accuracy'])
df5['kinship']=df4['true_label'].unique()
df5['accuracy']=acc_list
df5['accuracy']=df5.apply(lambda x:round(x['accuracy'],4),axis=1)
df5=df5.sort_values('accuracy',ascending=False).reset_index(drop=True)
df5

In [None]:
import plotly.express as px
fig = px.bar(df5, x='kinship', y='accuracy',text='accuracy',
             hover_data=['accuracy'], color='accuracy',
             height=600, title='Accuracy of each kinship')
fig.update_traces( textposition='outside')
fig.update_layout(
    title="Accuracy of each kinship",
    xaxis_title="Kinship",
    yaxis_title="Accuracy",
)
fig.show()

### Save the weight of the model
Make sure that you are already logged in to huggingface with `huggingface-cli login`

In [None]:
model.save_weights(f'{fusion_type}_kinship.h5')

In [None]:
from huggingface_hub import HfApi

api = HfApi()
repo_name = "DSPRO2"  
username = "Leo1212"

from huggingface_hub import HfApi

api = HfApi()
repo_id = f"{username}/{repo_name}"

# Save the model weights to the repository
api.upload_file(
    path_or_fileobj=f"{fusion_type}_kinship.h5",
    path_in_repo=f"{fusion_type}_kinship.h5",
    repo_id=repo_id,
    repo_type="model"
)
