## IMAGE EMBEDDER
This notebook implements a complete pipeline to **download a batch of images from a URL list**, use the **OpenAI CLIP (ViT-B/32) model** to **convert them into vector embeddings**, and finally **merge all vectors** into a single HDF5 file.

### 1. Initialization and Setup
Installs all necessary Python libraries, including **torch**, **h5py**, **gdown** *(for Google Drive downloads)*, **Pillow** *(for image processing)*, **ipywidgets** *(for tqdm)*, and most importantly, **clip** *(installed directly from OpenAI's GitHub)*.

In [1]:
!pip install requests boto3 h5py gdown torch tqdm typing numpy gdown Pillow ipywidgets git+https://github.com/openai/CLIP.git

Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to /tmp/pip-req-build-sevti835
  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git /tmp/pip-req-build-sevti835
  Resolved https://github.com/openai/CLIP.git to commit dcba3cb2e2827b402d2701e7e1c7d9fed8a20ef1
  Preparing metadata (setup.py) ... [?25ldone


In [2]:
import json
import h5py
from concurrent.futures import ThreadPoolExecutor
import subprocess
import clip
import gdown
from PIL import Image
import torch
from tqdm.auto import tqdm
import numpy as np
import os

### 2. Data and Tool Preparation

Downloads the `downloader.py` utility script, part of the **Open Images dataset** toolkit. This script is used for efficient, parallel downloading of the dataset images and is stored in the `.cache` directory.

In [3]:
# Ensure the local cache directory exists.
os.makedirs(".cache", exist_ok=True)

# Check if the downloader script already exists in the cache.
if not os.path.isfile(".cache/downloader.py"):
    # If not present, download the script from the remote repository.
    command = ["wget", "-O", ".cache/downloader.py", "https://raw.githubusercontent.com/openimages/dataset/master/downloader.py"]
    # Execute the download command and ensure it runs successfully.
    subprocess.run(command, check=True)

Downloads metadata files (`image_ids.json`, `image_urls.json`) derived from the **Open Images V7 dataset**, hosted on a Google Drive mirror. This data is used in accordance with the original **CC BY 4.0 License**.

In [4]:
# Create the main directory for raw dataset storage.
# 'exist_ok=True' prevents errors if the folder is already present.
os.makedirs("RAW_DATASET", exist_ok=True)

# Check if the 'image_ids' file exists. If not, download it from Google Drive.
if not os.path.isfile("RAW_DATASET/image_ids.json"):
    # Download file using its Google Drive ID.
    gdown.download(id='1-HcMviWpMn84cDaDkDpPXqEr-6BFc0bv', output='RAW_DATASET/image_ids.json', quiet=False)

# Check if the 'image_urls' file exists. If not, download it from Google Drive.
if not os.path.isfile("RAW_DATASET/image_urls.json"):
    # Download file using its Google Drive ID.
    gdown.download(id='16F8VRuKZb4SJ0KLa1Oh281RvdmQzj6Ft', output='RAW_DATASET/image_urls.json', quiet=False)

### 3. Model Loading and Function Definitions

In [5]:
def read_json(file_path):
    # Open the JSON file for reading, ensuring UTF-8 support.
    with open(file_path, 'r', encoding='utf-8') as f:
        # Load and return the data as a Python list.
        data = json.load(f)
        return data

In [6]:
# Load image identifiers from the local JSON file.
image_IDs = read_json("RAW_DATASET/image_ids.json")

# Load the corresponding image URLs from the local JSON file.
image_URLs = read_json("RAW_DATASET/image_urls.json")

# Determine the total number of images to be processed (based on the number of URLs).
n_images = len(image_URLs)

# Set the device for computation: use GPU ("cuda") if available, otherwise fallback to CPU.
device = "cuda" if torch.cuda.is_available() else "cpu"

# Load the pre-trained CLIP model (ViT-B/32) onto the selected device.
# 'model' is the main neural network, and 'preprocess' is the required image transformation pipeline.
model, preprocess = clip.load("ViT-B/32", device=device)

In [7]:
def load_and_preprocess_image(i):
    """
    Loads an image file, applies necessary preprocessing (from CLIP), 
    and returns the resulting tensor along with its URL.
    """
    # Construct the full path to the image using its ID.
    image_path = f".cache/Images/{image_IDs[i]}.jpg"
    
    # Check if the file exists on the local filesystem.
    if os.path.exists(image_path):
        try:
            # Open the image file using PIL.
            image = Image.open(image_path)
            
            # Apply the standard CLIP preprocessing pipeline.
            # 'preprocess' returns a PyTorch tensor (typically on CPU).
            image_tensor = preprocess(image) 
            
            # Return the original URL and the processed tensor.
            return image_URLs[i], image_tensor
        
        except Exception as e:
            # Catch exceptions (e.g., File is corrupted, not a valid image format).
            # Print an error message and skip the problematic image.
            print(f"Error processing image {image_path}: {e}")
            return None
    
    # Return None if the image file was not found at the specified path.
    return None

In [8]:
def merge_HDF5_files(input_list, output_file):
    """
    Merges data (URLs and embeddings) from multiple HDF5 files into a single output file.
    """
    if not input_list:
        print("❌ Error: Input file list is empty.")
        return

    total_records = 0

    # 1. Initialize Output Structure based on the first valid file
    first_file = None
    # Find the first existing file to determine the required structure (dtype, shape).
    for f_path in input_list:
        if os.path.exists(f_path):
            first_file = f_path
            break

    if not first_file:
        print("❌ Error: No valid input files found.")
        return

    with h5py.File(first_file, 'r') as f_first:
        # Get embedding dimension (shape[1]) and data types (dtype) for initialization.
        # np.squeeze is used to handle potential extra dimensions (e.g., shape (N, 1, D) -> (N, D)).
        embed_shape = np.squeeze(f_first['embeddings'][:]).shape[1]
        embed_dtype = f_first['embeddings'].dtype
        url_dtype = f_first['urls'].dtype

    # Create the output file and initialize extendable datasets.
    with h5py.File(output_file, 'w') as f_output:
        # Initialize 'urls' dataset with zero length, maxshape=(None,) allows extension.
        f_output.create_dataset(
            'urls',
            shape=(0,),
            maxshape=(None,),
            dtype=url_dtype,
            chunks=True
        )
        # Initialize 'embeddings' dataset with zero length, maxshape=(None, embed_shape) allows extension.
        f_output.create_dataset(
            'embeddings',
            shape=(0, embed_shape),
            maxshape=(None, embed_shape),
            dtype=embed_dtype,
            chunks=True
        )

    pbar = tqdm(total = len(input_list), desc="Merging")

    # 2. Loop through input files and append data
    # Open the output file in append mode ('a') for modification.
    with h5py.File(output_file, 'a') as f_output:
        for file_path in input_list:
            if not os.path.exists(file_path):
                print(f"⚠️ File not found: {file_path}. Skipping.")
                pbar.update(1)
                continue

            try:
                with h5py.File(file_path, 'r') as f_input:
                    # Read data from the current input file.
                    current_urls = f_input['urls'][:]
                    current_embeddings = f_input['embeddings'][:]
                    current_embeddings = np.squeeze(current_embeddings)
                    num_records = current_urls.shape[0]

                    if num_records == 0:
                        continue

                    dset_urls = f_output['urls']
                    dset_embeddings = f_output['embeddings']

                    new_size = total_records + num_records

                    # Resize the datasets in the output file to accommodate new records.
                    dset_urls.resize(new_size, axis=0)
                    dset_embeddings.resize(new_size, axis=0)

                    # Write the current file's data into the newly reserved space.
                    dset_urls[total_records:new_size] = current_urls
                    dset_embeddings[total_records:new_size] = current_embeddings

                    # Update the running total of merged records.
                    total_records = new_size

            except Exception as e:
                # Handle potential errors during file reading or resizing.
                print(f"❌ Error processing file {file_path}: {e}. Skipping this file.")

            pbar.update(1)
    pbar.close()

### 4. Main Processing Pipeline (Loop)

This is where the main logic is executed. The pipeline runs in **chunks** to save memory and be fault-tolerant.
* For **each chunk**, the script will:
    1.  **Download Images:** Call the `downloader.py` script to download the images for the current chunk.
    2.  **Preprocess (CPU):** Use a `ThreadPoolExecutor` to run the `load_and_preprocess_image` function in parallel.
    3.  **Encode (GPU):** Gather preprocessed images into large batches, push them to the GPU, and run `model.encode_image` to get the vectors.
    4.  **Save Chunk:** Write the URLs and vectors for this chunk into a temporary HDF5 file (e.g., `OUTPUT/Images_Embedded_0.h5`).
    5.  **Cleanup Chunk:** Delete the temporary image directory (`.cache/Images`).

In [9]:
START = 0
END = n_images
CHUNK = 10000
GPU_BATCH_SIZE = 512 
NUM_WORKERS = 50

In [10]:
# Ensure the main output directory for embedded HDF5 files exists.
os.makedirs("OUTPUT", exist_ok=True) 

# Initialize a progress bar for monitoring the total images to be embedded.
pbar = tqdm(total = END - START, desc = "Embedding: ")

# Main loop: Iterate over the image indices in defined CHUNK sizes.
for x in range(START, END, CHUNK):

    # 1. DOWNLOAD IMAGES FOR CURRENT CHUNK    
    # Generate a temporary list file containing image IDs for the current chunk.
    with open(".cache/list_images.txt", 'w', encoding='utf-8') as file:
        for i in range(x, min(n_images, x + CHUNK)):
            # Format: 'train/<image_id>'
            file.write("train/" + image_IDs[i] + "\n")

    # Ensure the target image download directory exists.
    os.makedirs(".cache/Images", exist_ok=True)
    
    # Execute the external Python downloader script.
    # It reads the list_images.txt and downloads files into .cache/Images using 100 processes.
    subprocess.run([
        "python", ".cache/downloader.py", ".cache/list_images.txt",
        "--download_folder=.cache/Images", "--num_processes=100"
    ])

    # 2. READ & PREPROCESS IMAGES (CPU - Multi-threading)
    # List to store (URL, preprocessed_tensor_on_CPU) tuples.
    preprocessed_data_cpu = [] 
    
    # Use ThreadPoolExecutor to parallelize I/O and image loading/preprocessing.
    with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
        # Submit the load_and_preprocess_image function for each image index in the chunk.
        futures = [executor.submit(load_and_preprocess_image, i) for i in range(x, min(n_images, x + CHUNK))]
        
        for future in futures:
            result = future.result()
            if result:
                # Append only successful results (not None).
                preprocessed_data_cpu.append(result)
            pbar.update(1)

    # Skip the current chunk if no images were successfully processed (e.g., all failed/corrupted).
    if not preprocessed_data_cpu:
        print(f"No images were processed in chunk {int(x / CHUNK)}")
        # Clean up the downloaded images before continuing.
        subprocess.run(["rm", "-rf", ".cache/Images"])
        continue

    # 3. EMBEDDING CALCULATION (GPU - Batch Processing)
    final_urls = []
    final_embeddings_list = []

    # Separate URLs and Tensors for batch processing.
    urls_cpu = [item[0] for item in preprocessed_data_cpu]
    tensors_cpu = [item[1] for item in preprocessed_data_cpu]

    # Disable gradient computation - CRITICAL for memory efficiency during inference.
    with torch.no_grad():
        # Iterate over the preprocessed tensors in GPU_BATCH_SIZE chunks.
        for i in range(0, len(tensors_cpu), GPU_BATCH_SIZE):
            batch_tensors_cpu = tensors_cpu[i : i + GPU_BATCH_SIZE]
            batch_urls = urls_cpu[i : i + GPU_BATCH_SIZE]
            
            # Stack individual tensors into a single batch tensor on the CPU.
            batch_on_cpu = torch.stack(batch_tensors_cpu)
            # Move the entire batch to the designated device (GPU).
            batch_on_gpu = batch_on_cpu.to(device)

            # Generate image embeddings using the CLIP model.
            vectors_gpu = model.encode_image(batch_on_gpu)
            
            # Normalize the vectors (L2 normalization), required for standard CLIP usage.
            vectors_gpu = vectors_gpu / vectors_gpu.norm(dim=-1, keepdim=True)
            
            # Convert GPU tensor to numpy array (CPU) for HDF5 writing.
            vectors_numpy = vectors_gpu.cpu().detach().numpy().astype(np.float32)

            # Store results for final HDF5 file.
            final_urls.extend(batch_urls)
            final_embeddings_list.append(vectors_numpy)

    # 4. WRITE HDF5 FILE
    # Construct the output path for the current chunk's embedding file.
    output_path = f"OUTPUT/Images_Embedded_{int(x / CHUNK)}.h5"
    
    # Only write if the file doesn't exist and we have data to write.
    if not os.path.exists(output_path) and final_embeddings_list:
        # Combine all batch numpy arrays into a single large array.
        all_embeddings_numpy = np.vstack(final_embeddings_list)
        
        # Write the URLs and embeddings into a new HDF5 file.
        with h5py.File(output_path, "w") as outfile:
            # URLs are stored as byte strings (dtype='S') in HDF5.
            outfile.create_dataset("urls", data=np.array(final_urls, dtype='S'))
            outfile.create_dataset("embeddings", data=all_embeddings_numpy)

    # 5. CLEANUP
    # Delete the downloaded images for the current chunk to save disk space.
    subprocess.run(["rm", "-rf", ".cache/Images"])

pbar.close()

Embedding:   0%|          | 0/3 [00:00<?, ?it/s]

Downloading images: 100%|██████████| 3/3 [00:02<00:00,  1.23it/s]


**Merge:** Call `merge_HDF5_files` to combine all temporary HDF5 chunk files into a single final file: `OUTPUT/Images_Embedded.h5`.
**Final Cleanup:** Delete all the temporary HDF5 chunk files.

In [11]:
# 1. MERGE CHUNKED HDF5 FILES
# Generate a list of all HDF5 chunk files created during the embedding process.
file_chunks = [
    f"OUTPUT/Images_Embedded_{int(x / CHUNK)}.h5"
    for x in range(START, END, CHUNK)
]

# Define the final, consolidated HDF5 file path.
file_gop_cuoi = f"OUTPUT/Images_Embedded.h5"

# Call the function to merge all chunk files into the single final file.
merge_HDF5_files(file_chunks, file_gop_cuoi)

# 2. CLEANUP: REMOVE TEMPORARY CHUNKS
# Iterate through the indices used to generate the chunk files.
for x in range(START, END, CHUNK):
    chunk_path = f"OUTPUT/Images_Embedded_{int(x / CHUNK)}.h5"
    try:
        # Delete the individual temporary HDF5 chunk file to free up disk space.
        os.remove(chunk_path)
    except FileNotFoundError:
        # Handle case where the file might have been skipped or already deleted.
        print(f"⚠️ Warning: Chunk file not found during cleanup: {chunk_path}")
        continue

Merging:   0%|          | 0/1 [00:00<?, ?it/s]