# Experiment to showcase duplicate detection

This Notebooks shows a step by step approach to duplicate detection.
It requires that the BUBE App is already running.

## Settings and Imports

In [None]:
import importlib.resources as impresources
import os
import random

import requests
from PIL import Image, ImageOps, ImageEnhance, ImageFilter

In [None]:
# Path of the running BUBE APP
BUBE_URL = "http://localhost:8000"

## Data Preparation

In the folder `tests/test_assets` are 5 images that are duplicates of each other.
These are called **feex_check00<NUMBER>.jpg**

In [None]:
image_root = str(impresources.files("tests") / "test_assets")
filenames_assets = [f"feex_check00{i}.jpg" for i in range(1, 6)]

For each image, we create multiple variants to test the duplicate detection.
The variants are created by the `add_variants_of_img()` function by applying the following transformations:
- Resize to 256x256 and 1024x1024
- Rotation
- Horizontal and Vertical Flip
- Cropping
- Contrast, Brightness, Saturation
- Grayscale
- Blur
- Edge Enhance
- Random Rotation

The resulting variants are stored in the same folder as the original images.

In [None]:
def add_variants_of_img(path: str):
    file_ending = f".{path.split(".")[-1]}"
    img_og = Image.open(path).convert("RGB")

    img_og.save(path.replace(file_ending, f"_duplicate{file_ending}"))

    img = ImageOps.contain(img_og, (256, 256))
    img.save(path.replace(file_ending, f"_resize_small{file_ending}"))

    img = ImageOps.contain(img_og, (1024, 1024))
    img.save(path.replace(file_ending, f"_resize{file_ending}"))

    img = img_og.rotate(20).resize((img_og.height, img_og.width))
    img.save(path.replace(file_ending, f"_rotate{file_ending}"))

    img = ImageOps.mirror(img_og)
    img.save(path.replace(file_ending, f"_mirror{file_ending}"))

    img = ImageOps.flip(img_og)
    img.save(path.replace(file_ending, f"_flip{file_ending}"))

    width, height = img_og.size
    left = width // 4
    top = height // 4
    right = 3 * width // 4
    bottom = 3 * height // 4
    img = img_og.crop((left, top, right, bottom))
    img.save(path.replace(file_ending, f"_crop{file_ending}"))

    enhancer = ImageEnhance.Contrast(img_og)
    img = enhancer.enhance(1.5)
    img.save(path.replace(file_ending, f"_contrast{file_ending}"))

    enhancer = ImageEnhance.Brightness(img_og)
    img = enhancer.enhance(0.7)
    img.save(path.replace(file_ending, f"_brightness{file_ending}"))

    enhancer = ImageEnhance.Color(img_og)
    img = enhancer.enhance(1.5)
    img.save(path.replace(file_ending, f"_saturation{file_ending}"))

    img = ImageOps.grayscale(img_og)
    img.save(path.replace(file_ending, f"_grayscale{file_ending}"))

    img = img_og.filter(ImageFilter.GaussianBlur(radius=2))
    img.save(path.replace(file_ending, f"_blur{file_ending}"))

    img = img_og.filter(ImageFilter.EDGE_ENHANCE)
    img.save(path.replace(file_ending, f"_edge_enhance{file_ending}"))

    angle = random.randint(0, 360)
    img = img_og.rotate(angle, expand=True)
    img.save(path.replace(file_ending, f"_random_rotate{file_ending}"))

In [None]:
for filename in filenames_assets:
    add_variants_of_img(f"{image_root}/{filename}")

In [None]:
# Path of the variants without the original images
filenames_variants = os.listdir(image_root)
filenames_variants = [name for name in filenames_variants if name.lower().endswith((".jpg", ".jpeg", ".png", ".heif"))]
filenames_variants = [name for name in filenames_variants if name not in filenames_assets]

## Data Upload

We store all variants of the images in the Vector Database without the original images.
These will be used afterward for the duplication check.

We send the images as binaries to the `feex/insert` endpoint of the BUBE App.

In [None]:
mime_types = {"jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png", "heif": "image/heif"}


def send_image_to_bube(image_folder_path: str, filenames: list[str]):
    image_files = []

    for file in filenames:
        file_path = os.path.join(image_folder_path, file)
        mime_type = mime_types.get(file.split(".")[-1], "image/jpeg")
        image_files.append(("images", (file, open(file_path, "rb"), mime_type)))

    if not image_files:
        print("No Images with specified filenames could be found.")
        return

    try:
        response = requests.post(f"{BUBE_URL}/feex/insert", files=image_files)
        response.raise_for_status()
        print("Images successfully uploaded.")
    except requests.exceptions.RequestException:
        print(f"Request failed. Is the BUBE App running on {BUBE_URL}?")
    finally:
        # close file handlers
        for _, file_tuple in image_files:
            file_tuple[1].close()

In [None]:
send_image_to_bube(image_root, filenames_variants)

## Duplicate Detection

After adding the different variants, we can now check for duplicates.
Each variant should be detected as a duplicate of the original image.

In [None]:
def duplicate_check_for_image(image_folder_path: str, img_filename: str):
    img_filepath = os.path.join(image_folder_path, img_filename)
    files = [("images", (img_filename, open(img_filepath, "rb"), "image/jpeg"))]
    try:
        response = requests.post(f"{BUBE_URL}/feex", files=files)
        response.raise_for_status()
        # print(f"Duplicate check for {img_filepath} successful.")
        res = response.json()
    except requests.exceptions.RequestException:
        print(f"Request failed. Is the BUBE App running on {BUBE_URL}?")
        res = None
    finally:
        for _, file_tuple in files:
            file_tuple[1].close()
    return res


def check_if_all_variants_are_detected(image_folder_path: str, img_filename: str):
    res = duplicate_check_for_image(image_folder_path, img_filename)
    if res is None:
        return False
    print(f"The number of duplicates for {img_filename} is {res[0]['duplicates']['num_of_files']}\n")
    return res[0]["duplicates"]["num_of_files"] >= 14

In [None]:
for filename in filenames_assets:
    check_if_all_variants_are_detected(image_folder_path=image_root, img_filename=filename)