In [7]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Lambda, Dropout, BatchNormalization, ReLU
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import backend as K
from sklearn.metrics import average_precision_score
import random
import os
import pandas as pd
import json
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import load_model
dataset_dir = '/kaggle/input/img-to-img/human_activity_retrieval_dataset'
input_shape = (196, 196, 3)
print('step_done')


step_done


In [10]:
# Create the base network (Siamese Network)
def create_base_network():
    inp = Input(shape=input_shape)
    x = Conv2D(32, (3, 3), activation='relu')(inp)
    x = MaxPooling2D((2,2))(x)
    x = Conv2D(64, (3, 3), activation='relu')(x)
    x = MaxPooling2D((2,2))(x)
    x = Conv2D(128, (3, 3), activation='relu')(x)
    x = MaxPooling2D((2,2))(x)
    x = Flatten()(x)
    output = Dense(128, activation=None)(x)
    # norm_embeddings = tf.nn.l2_normalize(x, axis=-1)
    return Model(inp, output)

def create_base_network_2():
    inputs = Input(shape=input_shape)
    # First layer
    x = Dense(512)(inputs)
    x = ReLU()(x)
    x = Dropout(0.5)(x)
    x = BatchNormalization()(x)
    # Second layer
    x = Dense(256)(x)
    x = ReLU()(x)
    x = Dropout(0.5)(x)
    x = BatchNormalization()(x)
    
    # Output layer
    outputs = Dense(15)(x)  # Total 15 classes
    
    model = Model(inputs=inputs, outputs=outputs)
    
    return model

base_network = create_base_network_2()
print('step_done')

step_done


In [11]:
def get_generator(dir_path, info_path, batch_size = 1, limit_per_demographic = 10):

    # Define mapping from categories to numerical values
    category_mapping = {'calling': 1, 'clapping': 2, 'cycling': 3, 'dancing': 4, 'drinking': 5, 'eating': 6,
                        'fighting': 7, 'hugging': 8, 'laughing': 9, 'listening_to_music': 10, 'running': 11,
                        'sitting': 12, 'sleeping': 13, 'texting': 14,
                        'using_laptop': 15}
    # Load image-to-label mappings
    with open(info_path) as f:
        info_map = json.load(f)
    # Convert categories to numerical values in the JSON
    info_map = {key: str(category_mapping[value]) for key, value in info_map.items()}
    paths = []
    labels_for_images = []
    demographic_map = {}
    final_map = {}
    for file_name in sorted(info_map.keys()):
        if info_map[file_name] in demographic_map:
            demographic_map[info_map[file_name]] += 1
        else:
            demographic_map[info_map[file_name]] = 1
            final_map[info_map[file_name]] = 0
        if final_map[info_map[file_name]] >= limit_per_demographic:
            continue

        file_path = os.path.join(dir_path, file_name)
        if os.path.isfile(file_path):
            paths.append(file_path)
            labels_for_images.append(info_map[file_name])
            final_map[info_map[file_name]] += 1

    print(final_map)
    img_datagen = ImageDataGenerator(
        featurewise_center=True,
        horizontal_flip=False,
        vertical_flip=False,
        preprocessing_function=None,
        data_format=None,
        dtype=None)

    train_df = pd.DataFrame({'filename': paths, 'label': labels_for_images})
    width = 196
    height = 196
    # Data preprocessing and augmentation
    generator = img_datagen.flow_from_dataframe(
        dataframe=train_df,
        directory=dir_path,
        x_col='filename',
        y_col='label',
        target_size=(height, width),
        batch_size=batch_size,
        class_mode='categorical',
        interpolation="nearest",
        shuffle=False
    )
    return generator, paths


def get_train_generator(limit_per_demographic = 10):
    train_dir = os.path.join(dataset_dir, 'train')  # train
    train_info_path = os.path.join(dataset_dir, 'train_image_info.json')  # train
    return get_generator(train_dir, train_info_path, 32, limit_per_demographic)
train_generator, image_paths = get_train_generator(50)
print('step_done', len(image_paths))
print(image_paths[:3])

{'12': 50, '4': 50, '7': 50, '15': 50, '9': 50, '10': 50, '14': 50, '6': 50, '2': 50, '3': 50, '5': 50, '13': 50, '1': 50, '8': 50, '11': 50}
Found 750 validated image filenames belonging to 15 classes.
step_done 750
['/kaggle/input/img-to-img/human_activity_retrieval_dataset/train/Image_1.jpg', '/kaggle/input/img-to-img/human_activity_retrieval_dataset/train/Image_10.jpg', '/kaggle/input/img-to-img/human_activity_retrieval_dataset/train/Image_100.jpg']


In [12]:
def create_pairs(image_paths, image_data, labels):
    img_pairs = []
    labels_pair = []
    similar_img_path_pairs = []
    dissimilar_img_path_pairs = []
    num_classes = len(np.unique(labels))
    class_indices = [np.where(labels == i)[0] for i in range(num_classes)]

    for idx1 in range(len(image_data)):
        current_image = image_data[idx1]
        label = labels[idx1]

        # Create positive pair
        idx2 = random.choice(class_indices[label])
        while idx1 == idx2:
            idx2 = random.choice(class_indices[label])
        img_pairs += [[current_image, image_data[idx2]]]
        similar_img_path_pairs += [[image_paths[idx1], image_paths[idx2]]]
        labels_pair += [1]
        
        # Create positive pair
        idx3 = random.choice(class_indices[label])
        while idx1 == idx3:
            idx3 = random.choice(class_indices[label])
        img_pairs += [[current_image, image_data[idx3]]]
        similar_img_path_pairs += [[image_paths[idx1], image_paths[idx3]]]
        labels_pair += [1]
        
        # Create positive pair
        idx4 = random.choice(class_indices[label])
        while idx1 == idx4:
            idx4 = random.choice(class_indices[label])
        img_pairs += [[current_image, image_data[idx4]]]
        similar_img_path_pairs += [[image_paths[idx1], image_paths[idx4]]]
        labels_pair += [1]
        
        # Create positive pair
        idx5 = random.choice(class_indices[label])
        while idx1 == idx5:
            idx5 = random.choice(class_indices[label])
        img_pairs += [[current_image, image_data[idx5]]]
        similar_img_path_pairs += [[image_paths[idx1], image_paths[idx5]]]
        labels_pair += [1]
        
        # Create positive pair
        idx6 = random.choice(class_indices[label])
        while idx1 == idx6:
            idx6 = random.choice(class_indices[label])
        img_pairs += [[current_image, image_data[idx6]]]
        similar_img_path_pairs += [[image_paths[idx1], image_paths[idx5]]]
        labels_pair += [1]
        
        # Create positive pair
        idx7 = random.choice(class_indices[label])
        while idx1 == idx7:
            idx7 = random.choice(class_indices[label])
        img_pairs += [[current_image, image_data[idx7]]]
        similar_img_path_pairs += [[image_paths[idx1], image_paths[idx7]]]
        labels_pair += [1]
        
        # Create positive pair
        idx8 = random.choice(class_indices[label])
        while idx1 == idx8:
            idx8 = random.choice(class_indices[label])
        img_pairs += [[current_image, image_data[idx8]]]
        similar_img_path_pairs += [[image_paths[idx1], image_paths[idx8]]]
        labels_pair += [1]

        # Create positive pair
        idx9 = random.choice(class_indices[label])
        while idx1 == idx9:
            idx9 = random.choice(class_indices[label])
        img_pairs += [[current_image, image_data[idx9]]]
        similar_img_path_pairs += [[image_paths[idx1], image_paths[idx9]]]
        labels_pair += [1]
        
        # Create negative pair
        neg_label = random.randint(0, num_classes - 1)
        while neg_label == label:
            neg_label = random.randint(0, num_classes - 1)
        idx2 = random.choice(class_indices[neg_label])
        img_pairs += [[current_image, image_data[idx2]]]
        dissimilar_img_path_pairs += [[image_paths[idx1], image_paths[idx2]]]
        labels_pair += [0]

        neg_label2 = random.randint(0, num_classes - 1)
        while neg_label2 == label:
            neg_label2 = random.randint(0, num_classes - 1)
        idx2 = random.choice(class_indices[neg_label2])
        img_pairs += [[current_image, image_data[idx2]]]
        dissimilar_img_path_pairs += [[image_paths[idx1], image_paths[idx2]]]
        labels_pair += [0]
        
        neg_label3 = random.randint(0, num_classes - 1)
        while neg_label3 == label:
            neg_label3 = random.randint(0, num_classes - 1)
        idx2 = random.choice(class_indices[neg_label3])
        img_pairs += [[current_image, image_data[idx2]]]
        dissimilar_img_path_pairs += [[image_paths[idx1], image_paths[idx2]]]
        labels_pair += [0]
        
        neg_label4 = random.randint(0, num_classes - 1)
        while neg_label4 == label:
            neg_label4 = random.randint(0, num_classes - 1)
        idx2 = random.choice(class_indices[neg_label4])
        img_pairs += [[current_image, image_data[idx2]]]
        dissimilar_img_path_pairs += [[image_paths[idx1], image_paths[idx2]]]
        labels_pair += [0]

    return np.array(img_pairs), np.array(labels_pair)

def get_train_images_and_labels():
    # Prepare data for training
    images, labels = [], []

    for _ in range(len(train_generator)):
        batch = next(train_generator)
        images.extend(batch[0])
        labels.extend(batch[1])

    images = np.array(images)
    labels = np.argmax(labels, axis=1)
    print('images_len', len(images))
    print('labels_len', len(labels))
    final_pairs, final_labels = create_pairs(image_paths, images, labels)
#     np.save('/kaggle/working/final_pairs_query.npy', final_pairs)
#     np.save('/kaggle/working/final_labels_query.npy', final_labels)
    return final_pairs, final_labels
pairs, labels = get_train_images_and_labels()
print('step_done')
print('labels_len', len(labels))
print('pairs_len', len(pairs))



images_len 750
labels_len 750
step_done
labels_len 9000
pairs_len 9000


In [None]:
from tensorflow.keras.layers import Layer
@tf.keras.utils.register_keras_serializable(package='Custom')
def euclidean_distance(vectors):
    (featsA, featsB) = vectors
    sum_squared = K.sum(K.square(featsA - featsB), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_squared, K.epsilon()))

@tf.keras.utils.register_keras_serializable(package='Custom')
def contrastive_loss(y_true, y_pred, margin=1.0):
    square_pred = tf.square(y_pred)
    margin_square = tf.square(tf.maximum(margin - y_pred, 0))
    return tf.reduce_mean(y_true * square_pred + (1 - y_true) * margin_square)

# Custom L2 Normalization Layer
@tf.keras.utils.register_keras_serializable(package='Custom')
class L2Normalization(Layer):
    def call(self, inputs):
        return tf.nn.l2_normalize(inputs, axis=-1)
@tf.keras.utils.register_keras_serializable(package='Custom')
class EuclideanDistance(Layer):
    def call(self, inputs):
        featsA, featsB = inputs
        sum_squared = K.sum(K.square(featsA - featsB), axis=1, keepdims=True)
        return K.sqrt(K.maximum(sum_squared, K.epsilon()))

def train_model():
    input_a = Input(shape=input_shape)
    input_b = Input(shape=input_shape)

    processed_a = base_network(input_a)
    processed_b = base_network(input_b)
    processed_a = L2Normalization()(processed_a)
    processed_b = L2Normalization()(processed_b)
    distance = EuclideanDistance()([processed_a, processed_b])

    model = Model([input_a, input_b], distance)
    optimizer = Adam(learning_rate=0.001)
    model.compile(loss=contrastive_loss, optimizer=optimizer, metrics=['accuracy'])

    
#     pairs = np.load('/kaggle/working/final_pairs_full.npy')
#     labels = np.load('/kaggle/working/final_labels_full.npy')

    # Train the model
    model.fit([pairs[:, 0], pairs[:, 1]], labels, batch_size=32, epochs=5)
#     model.eval()
    model.save('/kaggle/working/siamese_contrastive_full11.keras')
    print('done')

if __name__ == '__main__':
    train_model()

Epoch 1/5
