In [None]:
import tensorflow as tf
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import pandas as pd
import os
import random
import numpy as np
import cv2
import joblib
import helper_module
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications import ResNet50, VGG16, VGG19, DenseNet121, MobileNetV3Large, EfficientNetV2S
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import models, layers, Model
from tensorflow.keras.metrics import AUC
from tensorflow.image import ssim
from tensorflow.keras.layers import Lambda
from tensorflow.keras.callbacks import Callback
from tensorflow.keras import backend as K

from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from sklearn.metrics.pairwise import cosine_similarity



In [None]:


print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

if tf.test.is_gpu_available():
    print('GPU is available')
else:
    print('GPU is not available')

In [None]:
train_pairing_df =helper_module.read_csv_to_df('dataset/train.csv')
test_candidates_df = helper_module.read_csv_to_df('dataset/test_candidates.csv')
archive_images_df = helper_module.read_csv_to_df('archive/votes.csv')

In [None]:
from keras.preprocessing.image import ImageDataGenerator

# Data generator for training
# Additional data augmentation may be added here if desired

train_datagen = ImageDataGenerator (
    rescale=1./255,         # Rescale pixel values to [0, 1]
    rotation_range=0,      # Randomly rotate images by up to 20 degrees
    width_shift_range=0.2,  # Randomly shift image width by up to 20%
    height_shift_range=0.2, # Randomly shift image height by up to 20%
    shear_range=0.2,        # Shear transformations
    zoom_range=0.0,         # Randomly zoom in on images by up to 20%
    horizontal_flip=False,   # Randomly flip images horizontally
    fill_mode='nearest',    # Fill mode for newly created pixels
    channel_shift_range=0.2, #Determines the range of the color channel offset
    brightness_range=[0.5, 1.5],  # Random brightness
)


In [None]:
# Define the Gaussian noise function
def add_gaussian_noise(image, sigma=25):
    row, col, ch = image.shape
    mean = 0
    gauss = np.random.normal(mean, sigma, (row, col, ch))
    noisy = np.clip(image + gauss, 0, 255)
    return noisy.astype(np.uint8)

In [None]:
# Add salt and pepper noise function
def add_salt_and_pepper_noise(image, prob=0.01):
    
    # Create a copy of the image
    noisy = np.copy(image)
    
    # Number of pixels to contaminate with salt and pepper noise
    total_pixels = image.size
    num_salt = np.ceil(prob * total_pixels * 0.5)
    num_pepper = np.ceil(prob * total_pixels * 0.5)

    # Add Salt noise
    salt_coords = [np.random.randint(0, i-1, int(num_salt)) for i in image.shape]
    noisy[salt_coords[0], salt_coords[1], :] = 255

    # Add Pepper noise
    pepper_coords = [np.random.randint(0, i-1, int(num_pepper)) for i in image.shape]
    noisy[pepper_coords[0], pepper_coords[1], :] = 0

    return noisy.astype(np.uint8)

In [None]:
# Define a function to process the input image

def load_and_preprocess_image(image_path, target_size=(224,224), random_transform=False, noise_type=None):
    # Open the image using Pillow (PIL)
    # Pillow is a popular Python image processing library that allows you to open, 
    # edit, save, and work with a variety of image file formats in Python programs
    img = load_img(image_path, target_size=target_size)

    img = img.resize(target_size)

    # Convert the image pixels to a numpy array
    img = img_to_array(img, dtype='uint8')


    if random_transform:
        img = train_datagen.random_transform(img)


    # Add Gaussian noise if add_noise is True
    if noise_type == 'gaussian':
        img = add_gaussian_noise(img)

    if noise_type == 'salt_and_pepper':
        img= add_salt_and_pepper_noise(img)

    return img


def create_train_valid_dataset(random_transform, left_noise_type=None,right_noise_type=None, num_right_images=20):
    # Create a list to store the dataset
    dataset = []

    # Iterate through the rows of the CSV file and load/preprocess the images
    # process each image in the left and right
    for index, row in train_pairing_df.iterrows():
        left_image = load_and_preprocess_image(f"dataset/train/left/{row['left']}.jpg", random_transform=random_transform,noise_type=left_noise_type)

        # Load and preprocess similar image
        similar_image = load_and_preprocess_image(f"dataset/train/right/{row['right']}.jpg", random_transform=random_transform,noise_type=right_noise_type)

        # Load and preprocess additional dissimilar images
        right_images = [similar_image] # put similar image at index 0

        for i in range(num_right_images-1):
            # random select image
            right_idx = random.randint(0, len(train_pairing_df) - 1)

            # Ensure the right right image is different from the similar image
            while right_idx == index:
                right_idx = random.randint(0, len(train_pairing_df) - 1)

            right_image = load_and_preprocess_image(f"dataset/train/right/{train_pairing_df.iloc[right_idx]['right']}.jpg", random_transform=random_transform,noise_type=right_noise_type)
            right_images.append(right_image)

        # Create a data entry containing the left image, list of right images, and the index of the similar image
        data_entry = [left_image, right_images]
        dataset.append(data_entry)

    return dataset


def create_train_valid_dataset_archive(random_transform,left_noise_type=None,right_noise_type=None, num_right_images=20):
    # Create lists to store paired left and right images
    image_pairs_with_label = []

    # Define the directory path where your images are located
    left_image_directory = 'archive/left/left/'  # Update with the correct directory path
    right_image_directory = 'archive/right/right/'  # Update with the correct directory path

    # Define the range for generating random indices
    min_idx = 1
    max_idx = 6015

    # Iterate through the rows of the CSV file and load/preprocess the images
    for index, row in archive_images_df.iterrows():
        # Generate the filename based on the index (e.g., '00000.jpg', '00001.jpg', etc.)
        # start from 00000.jpg
        image_filename = f"{index + 1:05d}.jpg"

        # Construct the full paths to the left and right images
        left_image_path = os.path.join(left_image_directory, image_filename)
        right_image_path = os.path.join(right_image_directory, image_filename)

        # Check if both image files exist
        if os.path.exists(left_image_path) and os.path.exists(right_image_path) and row['wins']>row['fails']:
            # Load and preprocess the left image
            left_image = load_and_preprocess_image(left_image_path, random_transform=random_transform,noise_type=left_noise_type)

            # Load and preprocess the right image
            right_image = load_and_preprocess_image(right_image_path, random_transform=random_transform,noise_type=right_noise_type)

            # Create a pair with left image and a list of right images
            image_pair_with_label = [left_image, [right_image]]

            # Add random dissimilar right images to the list
            for _ in range(num_right_images - 1):  # Subtract 1 to account for the similar right image
                random_idx = random.randint(min_idx, max_idx)
                while random_idx == index + 1:  # Ensure the dissimilar image is not the same as the left image
                    random_idx = random.randint(min_idx, max_idx)
                dissimilar_image_filename = f"{random_idx:05d}.jpg"
                dissimilar_image_path = os.path.join(right_image_directory, dissimilar_image_filename)
                dissimilar_image = load_and_preprocess_image(dissimilar_image_path, random_transform=random_transform,noise_type=right_noise_type)
                image_pair_with_label[1].append(dissimilar_image)

            image_pairs_with_label.append(image_pair_with_label)
        else:
            print(f"Image files not found for index {index + 1}: {left_image_path}, {right_image_path}")


    return image_pairs_with_label

# total 21 image [left[right1,right2]]
def display_image_pair(image_pair):
    left_image, right_images = image_pair
    num_right_images = len(right_images)
    
    max_images_per_row = 10  # Maximum number of images to display in a single row

    num_rows = (num_right_images - 1) // max_images_per_row + 1  # Calculate the number of rows

    plt.figure(figsize=(15, 5 * num_rows))

    for i in range(num_rows):
        start_idx = i * max_images_per_row
        end_idx = min((i + 1) * max_images_per_row, num_right_images)
        
        row_right_images = right_images[start_idx:end_idx]

        plt.subplot(num_rows, max_images_per_row + 1, i * (max_images_per_row + 1) + 1)
        plt.imshow(left_image)
        plt.title("Left Image")
        plt.axis("off")

        for j, right_image in enumerate(row_right_images):
            plt.subplot(num_rows, max_images_per_row + 1, i * (max_images_per_row + 1) + j + 2)
            plt.imshow(right_image)
            plt.title(f"Right Image {start_idx + j + 1}")
            plt.axis("off")

    plt.tight_layout()
    plt.show()


In [None]:
# # Define a function to create and save the dataset if loading fails
def create_and_save_combined_dataset():
    train_valid_offical_dataset = create_train_valid_dataset(True,'gaussian','salt_and_pepper', 20)
    train_valid_outside_dataset = create_train_valid_dataset_archive(True,'gaussian','salt_and_pepper', 20)
    train_valid_dataset = train_valid_offical_dataset + train_valid_outside_dataset

    # Save data
    joblib.dump(train_valid_dataset, 'D:/cv_data/train_valid_dataset.joblib')

# Try to load the dataset
try:
    train_valid_dataset = joblib.load('D:/cv_data/train_valid_dataset.joblib')
    print(f"Loaded dataset size is {len(train_valid_dataset)}")
except FileNotFoundError:
    print("Dataset not found, creating and saving...")
    create_and_save_combined_dataset()
    train_valid_dataset = joblib.load('D:/cv_data/train_valid_dataset.joblib')
    print(f"Created and saved dataset size is {len(train_valid_dataset)}")

# Now you have the dataset loaded or created, and you can work with it as needed.
print(train_valid_dataset[6000][0].shape)
display_image_pair(train_valid_dataset[6000])

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt


# SIFT and FLANN
def detect_and_compute_sift_features(img):
    """Detect and compute SIFT interest points and their descriptors."""
    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    sift = cv2.SIFT_create()
    kp, des = sift.detectAndCompute(gray, None)
    return kp, des

def match_features_with_flann(des1, des2):
    """Match features using FLANN matcher."""
    # FLANN parameters
    FLANN_INDEX_KDTREE = 0
    index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5)
    search_params = dict(checks=50)  # or pass empty dictionary

    flann = cv2.FlannBasedMatcher(index_params, search_params)
    matches = flann.knnMatch(des1, des2, k=2)

    # Need to draw only good matches, so create a mask
    matches_mask = [[0, 0] for _ in range(len(matches))]

    # ratio test as per Lowe's paper
    for i, (m, n) in enumerate(matches):
        if m.distance < 0.7 * n.distance:
            matches_mask[i] = [1, 0]

    return matches, matches_mask

def draw_image_matches(img1, kp1, img2, kp2, matches, matches_mask):
    """Draw matches of two images."""
    draw_params = dict(matchColor=(0, 255, 0),
                       singlePointColor=(255, 0, 0),
                       matchesMask=matches_mask,
                       flags=cv2.DrawMatchesFlags_DEFAULT)

    img_matches = cv2.drawMatchesKnn(img1, kp1, img2, kp2, matches, None, **draw_params)
    plt.imshow(img_matches)
    plt.show()

# Load images
img1 = cv2.cvtColor(cv2.imread('image1.jpg'), cv2.COLOR_BGR2RGB)
img2 = cv2.cvtColor(cv2.imread('image2.jpg'), cv2.COLOR_BGR2RGB)

# Detect and compute SIFT features
kp1, des1 = detect_and_compute_sift_features(img1)
kp2, des2 = detect_and_compute_sift_features(img2)

# Match features using FLANN
matches, matches_mask = match_features_with_flann(des1, des2)

# Draw matches
draw_image_matches(img1, kp1, img2, kp2, matches, matches_mask)


In [None]:
#load the model
base_model = VGG16(weights='imagenet', include_top=False)
base_model.trainable=False

In [None]:
#define the feature extraction model
feature_extractor = Model(inputs=base_model.input, outputs=base_model.get_layer('block5_pool').output)

In [None]:
def extract_image_features(image, model=feature_extractor):
    # Convert the image pixels to a numpy array
    image = img_to_array(image)
    # Reshape data for the model
    image = np.expand_dims(image, axis=0)
    # Prepare the image for the VGG model
    image = preprocess_input(image)
    # Get features
    feature = model.predict(image, use_multiprocessing=True)
    return feature

In [None]:
#define the distance to calculate the simimalirty
from scipy.spatial import distance

def calculate_cosine_distance(left_image, right_image, model):
    left_features = extract_image_features(left_image, model)
    right_features = extract_image_features(right_image, model)
    return cosine_similarity([left_features.flatten()], [right_features.flatten()])[0][0]

def calculate_euclidean_distance(left_image, right_image, model):
    left_features = extract_image_features(left_image, model)
    right_features = extract_image_features(right_image, model)
    return distance.euclidean([left_features.flatten()], [right_features.flatten()])

def calculate_manhattan_distance(left_image, right_image, model):
    left_features = extract_image_features(left_image, model)
    right_features = extract_image_features(right_image, model)
    return distance.cityblock([left_features.flatten()], [right_features.flatten()])



In [None]:
# Split the dataset into training and validation sets (e.g., 80% for training, 20% for validation)
train_pairs, valid_pairs = train_test_split(train_valid_dataset, test_size=0.2, random_state=42)

def data_generator(pair_list, batch_size=32, num_right_images=20):
    while True:
        left_images = []
        right_images = []
        labels = []

        # Shuffle the pairs for each epoch
        random.shuffle(pair_list)

        for pair in pair_list:
            left_image, right_images_list = pair[0], pair[1]

            # Create labels as a list of similarity scores (1.0 for the similar image, 0.0 for dissimilar images)
            label = [1] + [0] * (num_right_images - 1)

            left_images.append(left_image)  # Add the left image to the left_images list
            right_images.append(right_images_list[:num_right_images])
            labels.append(label)

            if len(left_images) >= batch_size:
                yield [np.array(left_images), np.array(right_images)], np.array(labels)
                left_images = []
                right_images = []
                labels = []

        # Yield the remaining data in the last batch
        if len(left_images) > 0:
            yield [np.array(left_images), np.array(right_images)], np.array(labels)

left_image = train_pairs[30][0]
right_image = train_pairs[30][1][0]

draw_image_matches(sift, [left_image,right_image])
print(f"training size is {len(train_pairs)}")
print(f"VGG16 cosine score is {calculate_cosine_distance(left_image,right_image,base_model)}")