In [39]:
# load libraries
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt
import os
import sys
import argparse
import re
import glob
import matplotlib
from matplotlib import rc
from matplotlib.ticker import MaxNLocator
from matplotlib import rcParams
from matplotlib import cm
from matplotlib.colors import ListedColormap
from matplotlib.colors import Normalize
from matplotlib.colorbar import ColorbarBase
from matplotlib import colors
from matplotlib import colorbar
from matplotlib import patches
from matplotlib import lines
from matplotlib import gridspec
from matplotlib import ticker
from matplotlib import transforms
from matplotlib import font_manager
from matplotlib import animation
from matplotlib import rcParams
from matplotlib import rc
from matplotlib import dates
from PIL import Image, ImageEnhance, ImageFilter, ImageDraw
import cv2
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Input, Flatten, Dense, Lambda, concatenate
from tensorflow.keras.models import Model, load_model
import tensorflow.keras.backend as K
import shutil
from tqdm.notebook import tqdm
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.applications.vgg16 import VGG16
import contextlib
import io
from scipy.spatial.distance import cosine

In [10]:
class ImageAugmentation:
    def __init__(self, image_path, output_dir='./images/augmented'):
        self.image = Image.open(image_path)
        self.base_name = os.path.splitext(os.path.basename(image_path))[0]
        # self.output_dir = './images/augmented_images'
        self.output_dir = os.path.join(output_dir, self.base_name)
        os.makedirs(self.output_dir, exist_ok=True)

    def bezier_curve(self, p0, p1, p2, t):
        """Calculate a point on a quadratic Bezier curve."""
        return (
            (1 - t) ** 2 * p0[0] + 2 * (1 - t) * t * p1[0] + t ** 2 * p2[0],
            (1 - t) ** 2 * p0[1] + 2 * (1 - t) * t * p1[1] + t ** 2 * p2[1]
        )

    def draw_bezier_curve(self, draw, p0, p1, p2, color, thickness):
        """Draw a quadratic Bezier curve using line segments."""
        steps = 100  # Number of steps for approximation
        prev_point = self.bezier_curve(p0, p1, p2, 0)
        for i in range(1, steps + 1):
            t = i / steps
            current_point = self.bezier_curve(p0, p1, p2, t)
            draw.line([prev_point, current_point], fill=color, width=thickness)
            prev_point = current_point

    def save_image(image, output_path):
        # Save the modified image to the specified path
        image.save(output_path)

    def adjust_brightness(self, factor):
        enhancer = ImageEnhance.Brightness(self.image)
        out = enhancer.enhance(factor)
        out.save(os.path.join(self.output_dir, f"{self.base_name}_bright.jpg"))

    def adjust_contrast(self, factor):
        enhancer = ImageEnhance.Contrast(self.image)
        out = enhancer.enhance(factor)
        out.save(os.path.join(self.output_dir, f"{self.base_name}_contrast.jpg"))

    def adjust_saturation(self, factor):
        enhancer = ImageEnhance.Color(self.image)
        out = enhancer.enhance(factor)
        out.save(os.path.join(self.output_dir, f"{self.base_name}_saturated.jpg"))

    def add_gaussian_noise(self, mean=0, var=0.1):
        img = np.array(self.image)
        row, col, ch = img.shape
        sigma = var**0.5
        gauss = np.random.normal(mean, sigma, (row, col, ch))
        noisy = img + gauss
        noisy_image = np.clip(noisy, 0, 255).astype(np.uint8)
        Image.fromarray(noisy_image).save(os.path.join(self.output_dir, f"{self.base_name}_noisy.jpg"))

    def blur_image(self, radius=2):
        out = self.image.filter(ImageFilter.GaussianBlur(radius))
        out.save(os.path.join(self.output_dir, f"{self.base_name}_blurred.jpg"))

    def add_curved_lines(self, num_lines=5, thickness=3):
        image = self.image.convert("RGBA")
        width, height = image.size
        overlay = Image.new("RGBA", (width, height), (255, 255, 255, 0))
        draw = ImageDraw.Draw(overlay)

        for _ in range(num_lines):
            p0 = (random.randint(0, width), random.randint(0, height))
            p1 = (random.randint(0, width), random.randint(0, height))
            p2 = (random.randint(0, width), random.randint(0, height))
            line_color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255), random.randint(150, 255))
            
            # Draw the Bezier curve
            self.draw_bezier_curve(draw, p0, p1, p2, line_color, thickness)

        return Image.alpha_composite(image, overlay).convert("RGB")

    def add_straight_lines(self, num_lines=5, thickness=3):
        image = self.image.convert("RGBA")
        width, height = image.size
        overlay = Image.new("RGBA", (width, height), (255, 255, 255, 0))
        draw = ImageDraw.Draw(overlay)

        for _ in range(num_lines):
            x1, y1 = random.randint(0, width), random.randint(0, height)
            x2, y2 = random.randint(0, width), random.randint(0, height)
            line_color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255), random.randint(150, 255))
            draw.line((x1, y1, x2, y2), fill=line_color, width=thickness)

        return Image.alpha_composite(image, overlay).convert("RGB")

    def add_lines(self, num_lines=5, thickness=3, ratio_curved=0.5, color=(0, 0, 0), count=0):
        image = self.image.convert("RGBA")
        width, height = image.size
        overlay = Image.new("RGBA", (width, height), (255, 255, 255, 0))
        draw = ImageDraw.Draw(overlay)

        for _ in range(num_lines):

            # random color (black with random alpha)
            color_random_alpha = color + (random.randint(150, 255),)
            # color_random_alpha = color
            thickness_random = random.randint(1, thickness)

            if random.random() < ratio_curved:
                p0 = (random.randint(0, width), random.randint(0, height))
                p1 = (random.randint(0, width), random.randint(0, height))
                p2 = (random.randint(0, width), random.randint(0, height))
                line_color = color_random_alpha
                
                # Draw the Bezier curve
                self.draw_bezier_curve(draw, p0, p1, p2, line_color, thickness_random)
            else:
                x1, y1 = random.randint(0, width), random.randint(0, height)
                x2, y2 = random.randint(0, width), random.randint(0, height)
                line_color = color_random_alpha

                draw.line((x1, y1, x2, y2), fill=line_color, width=thickness_random)

        combined_image = Image.alpha_composite(image, overlay)
        combined_image.convert("RGB").save(os.path.join(self.output_dir, f"{self.base_name}_lines_{count}.jpg"))

    def create_all_images(self):
        self.adjust_brightness(1.5)
        self.adjust_contrast(1.5)
        self.adjust_saturation(1.5)
        self.add_gaussian_noise(mean=0, var=30)
        self.blur_image(radius=1)
        for i in range(50):
            amount_of_lines = random.randint(1, 20)
            ratio_curved = random.uniform(0, 1)
            self.add_lines(num_lines=amount_of_lines, thickness=3, ratio_curved=ratio_curved, color=(0, 0, 0), count=i)

# create the invariants

for now just for a subset of 1000 images (folders are images)

In [None]:
# copy random 1000 images from ./images/original/ to ./images/original_test/
def copy_images_to_test():
    os.makedirs('./images/original_test', exist_ok=True)
    image_files = glob.glob('./images/original/*.jpg')
    random.shuffle(image_files)
    for i, image_file in enumerate(image_files):
        if i < 1000:
            shutil.copy(image_file, './images/original_test/')
            # print from and to
            # print(f"{image_file} -> ./images/original_test/")

copy_images_to_test()

### create the invariants

In [11]:
# for all images in ./images/original_test/ create augmented images
image_files = glob.glob('./images/original_test/*.jpg')
for image_file in tqdm(image_files):
    image_augmentation = ImageAugmentation(image_file, output_dir='./images/augmented_test')
    image_augmentation.create_all_images()
    # print(f"Augmented images created for {image_file}")

  0%|          | 0/1000 [00:00<?, ?it/s]

# create triplets

In [13]:
def prepare_triplets(image_folder_original, image_folder_augmented, num_triplets_per_anchor=10):
    """
    Prepare multiple triplets for training. This function creates multiple triplets (anchor, positive, negative)
    for each original image using various augmented images and different negative samples.
    
    :param image_folder_original: Folder containing original images.
    :param image_folder_augmented: Folder containing augmented images in subfolders named after each original image.
    :param num_triplets_per_anchor: Number of triplets to generate per original image.
    :return: Arrays of anchor, positive, and negative images.
    """
    anchor_images = []
    positive_images = []
    negative_images = []

    # List all original images
    original_images = [os.path.join(image_folder_original, f) for f in os.listdir(image_folder_original) if f.endswith('.jpg')]

    # Create multiple triplets for each anchor
    for original_image in tqdm(original_images):
        anchor = preprocess_image(original_image)
        image_name = os.path.splitext(os.path.basename(original_image))[0]
        augmented_image_folder = os.path.join(image_folder_augmented, image_name)

        if not os.path.exists(augmented_image_folder):
            print(f"No augmented image folder found for {original_image}")
            continue  # Skip if no augmented folder

        # List all augmented images for this original image
        positive_images_paths = [os.path.join(augmented_image_folder, f) for f in os.listdir(augmented_image_folder) if f.endswith('.jpg')]
        
        if not positive_images_paths:
            print(f"No augmented images found for {original_image}")
            continue  # Skip if no augmented versions

        for _ in range(num_triplets_per_anchor):
            # Select a random positive sample
            positive_image_path = random.choice(positive_images_paths)
            positive = preprocess_image(positive_image_path)

            # Select a random negative sample (from another original image)
            negative_image_path = random.choice([img for img in original_images if img != original_image])
            negative = preprocess_image(negative_image_path)

            # Append the triplet to the lists
            anchor_images.append(anchor)
            positive_images.append(positive)
            negative_images.append(negative)

    return np.array(anchor_images), np.array(positive_images), np.array(negative_images)

def preprocess_image(image_path):
    """
    Load and preprocess an image.
    :param image_path: Path to the image.
    :return: Preprocessed image array.
    """
    image = load_img(image_path, target_size=(224, 224))  # Resize to match the model input size
    image = img_to_array(image)  # Convert to array
    image = preprocess_input(image)  # Preprocess for the specific model
    return image

# Specify the paths to your original and augmented images
original_image_folder = './images/original_test/'
augmented_image_folder = './images/augmented_test/'

# Generate triplets
anchor_images, positive_images, negative_images = prepare_triplets(original_image_folder, augmented_image_folder, num_triplets_per_anchor=1)

print(f'Generated {len(anchor_images)} triplets for training.')

  0%|          | 0/1000 [00:00<?, ?it/s]

Generated 1000 triplets for training.


#  train model

## functions

In [16]:
def create_base_network(input_shape):
    """
    Base network to be shared (Siamese network)
    """
    base_model = VGG16(weights='imagenet', include_top=False, input_shape=input_shape)
    x = base_model.output
    x = Flatten()(x)
    x = Dense(256, activation='relu')(x)  # Embedding size is 256
    model = Model(inputs=base_model.input, outputs=x)
    
    return model


def triplet_loss(y_true, y_pred, alpha=0.2):
    """
    Triplet loss function.
    y_true is not used.
    y_pred contains the anchor, positive and negative embeddings concatenated.
    """
    total_lenght = y_pred.shape.as_list()[-1]

    # Split the encoding into anchor, positive and negative encodings
    anchor = y_pred[:, 0:int(total_lenght*1/3)]
    positive = y_pred[:, int(total_lenght*1/3):int(total_lenght*2/3)]
    negative = y_pred[:, int(total_lenght*2/3):int(total_lenght*3/3)]

    # Triplet Loss Formula
    pos_dist = K.sum(K.square(anchor - positive), axis=1)
    neg_dist = K.sum(K.square(anchor - negative), axis=1)
    basic_loss = pos_dist - neg_dist + alpha
    loss = K.maximum(basic_loss, 0.0)

    return loss

In [17]:
input_shape = (224, 224, 3)
base_network = create_base_network(input_shape)

In [18]:
anchor_input = Input(input_shape, name="anchor_input")
positive_input = Input(input_shape, name="positive_input")
negative_input = Input(input_shape, name="negative_input")

# Generate embeddings for each input
encoded_anchor = base_network(anchor_input)
encoded_positive = base_network(positive_input)
encoded_negative = base_network(negative_input)

# Concatenate all embeddings into one vector for loss calculation
merged_vector = concatenate([encoded_anchor, encoded_positive, encoded_negative], axis=1)

# Define the model with anchor, positive, and negative inputs
triplet_model = Model(inputs=[anchor_input, positive_input, negative_input], outputs=merged_vector)

# Compile the model with the triplet loss
triplet_model.compile(loss=triplet_loss, optimizer='adam')

In [19]:
print(f'Anchor images shape: {anchor_images.shape}')
print(f'Positive images shape: {positive_images.shape}')
print(f'Negative images shape: {negative_images.shape}')

Anchor images shape: (1000, 224, 224, 3)
Positive images shape: (1000, 224, 224, 3)
Negative images shape: (1000, 224, 224, 3)


In [20]:
triplet_model.fit(
    [anchor_images, positive_images, negative_images],
    np.zeros((anchor_images.shape[0], 1)),  # Dummy labels since the loss is custom
    batch_size=32,
    epochs=10
)

Epoch 1/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m792s[0m 24s/step - loss: 441588.3125
Epoch 2/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m785s[0m 24s/step - loss: 603.6125
Epoch 3/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m787s[0m 25s/step - loss: 224.2026
Epoch 4/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m790s[0m 25s/step - loss: 1241.5020
Epoch 5/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m786s[0m 25s/step - loss: 155.9232
Epoch 6/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m788s[0m 25s/step - loss: 2.4176
Epoch 7/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m886s[0m 28s/step - loss: 0.5998
Epoch 8/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m812s[0m 25s/step - loss: 0.2240
Epoch 9/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m796s[0m 25s/step - loss: 0.0631
Epoch 10/10
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m786s[0m

<keras.src.callbacks.history.History at 0x19455f77e30>

In [21]:
# Define the embedding model using the trained base network
embedding_model = Model(inputs=base_network.input, outputs=base_network.output)

In [23]:
embedding_model.save('./models/embedding_model_01.keras')

In [26]:
loaded_embedding_model = load_model('./models/embedding_model_01.keras')

# get embeddings

In [27]:
def preprocess_image(image_path):
    """
    Load and preprocess an image for the embedding model.
    :param image_path: Path to the image.
    :return: Preprocessed image array.
    """
    image = load_img(image_path, target_size=(224, 224))  # Resize to match the model input size
    image = img_to_array(image)  # Convert to array
    image = np.expand_dims(image, axis=0)  # Add batch dimension
    image = preprocess_input(image)  # Preprocess for VGG16 model
    return image

In [44]:
# Folder containing original images
original_image_folder = './images/original_test/'

# Dictionary to store image embeddings
embeddings_dict = {}

# List all original images
original_images = [os.path.join(original_image_folder, f) for f in os.listdir(original_image_folder) if f.endswith('.jpg')]

counter = 0
for image_path in tqdm(original_images):
    # if(counter < 100):
    # Preprocess the image
    preprocessed_image = preprocess_image(image_path)
    
    # Compute the embedding
    with contextlib.redirect_stdout(io.StringIO()):
        embedding = embedding_model.predict(preprocessed_image)
        embedding = embedding.flatten()
    
    # Store the embedding in the dictionary with the image path as the key
    embeddings_dict[image_path] = embedding
    counter += 1

np.savez('./embeddings_augmented/embeddings_dict_03.npz', **embeddings_dict)

  0%|          | 0/1000 [00:00<?, ?it/s]

In [37]:
np.savez('./embeddings_augmented/embeddings_dict_01.npz', **embeddings_dict)

# load again by doing:
# loaded = np.load('embeddings_dict.npz')
# Convert the loaded data back to a dictionary
# embeddings_dict = {key: loaded[key] for key in loaded}

# test model

In [48]:
def find_most_similar_image(new_image_embedding, embeddings_dict):
    min_distance = float('inf')
    most_similar_image = None

    # Ensure new_image_embedding is 1D
    new_image_embedding = np.asarray(new_image_embedding).flatten()

    for image_path, embedding in embeddings_dict.items():
        # Ensure each embedding is 1D
        embedding = np.asarray(embedding).flatten()

        # Calculate the cosine distance
        distance = cosine(new_image_embedding, embedding)
        if distance < min_distance:
            min_distance = distance
            most_similar_image = image_path

    return most_similar_image, min_distance

# Example usage: Compute embedding for a new image
new_image_path = './images/augmented_test/Aaland_181874_MI-s/Aaland_181874_MI-s_contrast.jpg'
new_image_preprocessed = preprocess_image(new_image_path)

# Ensure new_image_preprocessed is 4D (batch size, height, width, channels)
# new_image_preprocessed = np.expand_dims(new_image_preprocessed, axis=0)

new_image_embedding = embedding_model.predict(new_image_preprocessed)
new_image_embedding = new_image_embedding.flatten()

# Find the most similar image
most_similar_image, distance = find_most_similar_image(new_image_embedding, embeddings_dict)
print(f'The most similar image is {most_similar_image} with a distance of {distance}')

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 166ms/step
The most similar image is ./images/original_test/Cuba_66467_EWO-s.jpg with a distance of 0.0
