In [1]:
import cv2
import numpy as np
import os
import pandas as pd
import matplotlib.pyplot as plt
from skimage.feature import local_binary_pattern
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.decomposition import PCA
from PIL import Image
from typing import List, Tuple, Dict, Any, Generator 
import sqlite3
from tqdm import tqdm



In [8]:
root_directory = "image_data"
database_path = "image_metadata.db"

In [13]:
def find_image_files_with_metadata(root_dir: str, batch_size: int = 1000) -> Generator[List[Dict], None, None]:
    """
    Recursively find all image files in the directory and return basic metadata in batches.
    
    Args:
    - root_dir (str): The root directory to start searching for images.
    - batch_size (int): Number of metadata entries per batch.
    
    Yields:
    - List[Dict]: A batch of metadata dictionaries.
    """
    metadata_batch = []
    for subdir, dirs, files in os.walk(root_dir):
        for file in files:
            if file.lower().endswith(('.jpg', '.jpeg', '.png')):
                full_path = os.path.join(subdir, file)
                image = cv2.imread(full_path)
                if image is not None:
                    height, width, _ = image.shape
                    metadata_batch.append({
                        'file_name': file,
                        'file_path': full_path,
                        'directory': subdir,
                        'width': width,
                        'height': height,
                    })
                    if len(metadata_batch) == batch_size:
                        yield metadata_batch
                        metadata_batch = []
    if metadata_batch:
        yield metadata_batch

In [15]:
def setup_database(db_path: str):
    """
    Set up the SQLite database with the required schema.
    
    Args:
    - db_path (str): Path to the SQLite database file.
    """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # Drop the existing table if it exists
    cursor.execute('''
    DROP TABLE IF EXISTS images
    ''')
    
    # Create the new table with the updated schema
    cursor.execute('''
    CREATE TABLE images (
        image_id INTEGER PRIMARY KEY AUTOINCREMENT,
        file_name TEXT NOT NULL,
        file_path TEXT NOT NULL,
        directory TEXT NOT NULL,
        width INTEGER NOT NULL,
        height INTEGER NOT NULL
    )
    ''')
    conn.commit()
    conn.close()

def insert_metadata_batch(conn: sqlite3.Connection, metadata_batch: List[Dict]):
    """
    Insert a batch of image metadata into the database.
    
    Args:
    - conn (sqlite3.Connection): SQLite connection object.
    - metadata_batch (List[Dict]): List of metadata dictionaries.
    """
    cursor = conn.cursor()
    cursor.executemany('''
    INSERT INTO images (file_name, file_path, directory, width, height)
    VALUES (:file_name, :file_path, :directory, :width, :height)
    ''', metadata_batch)
    conn.commit()

def retrieve_metadata(conn: sqlite3.Connection, image_id: int) -> Dict:
    """
    Retrieve image metadata from the database.
    
    Args:
    - conn (sqlite3.Connection): SQLite connection object.
    - image_id (int): ID of the image to retrieve metadata for.
    
    Returns:
    - dict: Dictionary containing image metadata.
    """
    cursor = conn.cursor()
    cursor.execute('''
    SELECT image_id, file_name, file_path, directory, width, height FROM images WHERE image_id = ?
    ''', (image_id,))
    row = cursor.fetchone()
    if row:
        return {
            'image_id': row[0],
            'file_name': row[1],
            'file_path': row[2],
            'directory': row[3],
            'width': row[4],
            'height': row[5]
        }
    return None

# Example usage
if __name__ == "__main__":
    root_directory = "image_data"
    database_path = "image_metadata.db"
    
    # Set up database
    setup_database(database_path)

    # Open a connection to the database
    conn = sqlite3.connect(database_path)
    
    # Initialize tqdm progress bar
    total_images = sum([len(files) for r, d, files in os.walk(root_directory) if files])
    progress_bar = tqdm(total=total_images, desc="Processing Images", unit="image")

    # Collect and insert metadata in batches
    for metadata_batch in find_image_files_with_metadata(root_directory):
        insert_metadata_batch(conn, metadata_batch)
        progress_bar.update(len(metadata_batch))
    
    # Close the progress bar
    progress_bar.close()
    
    # Retrieve metadata for testing (this line just retrieves one image's metadata as a test)
    image_metadata = retrieve_metadata(conn, 1)
    print(image_metadata)
    
    # Close the database connection
    conn.close()

Processing Images: 100%|█████████▉| 444668/444682 [7:39:08<00:00, 16.14image/s]   

{'image_id': 1, 'file_name': '000000000034.jpg', 'file_path': 'image_data\\coco2017_train\\train2017\\000000000034.jpg', 'directory': 'image_data\\coco2017_train\\train2017', 'width': 640, 'height': 425}





In [None]:
def load_images_in_batches(image_paths: List[str], batch_size: int = 100) -> Generator[List[np.ndarray], None, None]:
    """Yield batches of images from the disk."""
    batch = []
    for path in image_paths:
        image = cv2.imread(path, cv2.IMREAD_COLOR)
        if image is not None:
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            batch.append(image)
        if len(batch) == batch_size:
            yield batch
            batch = []
    if batch:
        yield batch

# Ensure metadata_df is defined
if 'metadata_df' in globals():
    # Get image paths from the metadata DataFrame
    image_paths = metadata_df['file_path'].tolist()

    # Load the first batch of images
    image_batches = load_images_in_batches(image_paths, batch_size=10)
    first_batch = next(image_batches)

    # Display the first batch of images
    for i, img in enumerate(first_batch):
        plt.figure()
        plt.imshow(img)
        plt.title(f"Image {i+1}")
        plt.axis('off')
        plt.show()  # Ensure each image is displayed
else:
    print("metadata_df is not defined. Please run the previous cell to collect basic metadata.")

In [None]:
def extract_features(image: np.ndarray, model: PCA) -> np.ndarray:
    """Extract features using PCA (as a placeholder for deep model)."""
    image_flat = image.flatten().reshape(1, -1)
    return model.transform(image_flat)

def color_similarity(image1: np.ndarray, image2: np.ndarray) -> float:
    """Compute similarity based on color histograms."""
    hist1 = cv2.calcHist([image1], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
    hist2 = cv2.calcHist([image2], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
    return cv2.compareHist(hist1, hist2, cv2.HISTCMP_CORREL)

def custom_similarity(image1: np.ndarray, image2: np.ndarray) -> float:
    """Compute texture similarity using Local Binary Patterns."""
    lbp1 = local_binary_pattern(cv2.cvtColor(image1, cv2.COLOR_RGB2GRAY), 24, 8, method='uniform')
    lbp2 = local_binary_pattern(cv2.cvtColor(image2, cv2.COLOR_RGB2GRAY), 24, 8, method='uniform')
    hist1, _ = np.histogram(lbp1.ravel(), bins=np.arange(0, 27), range=(0, 26))
    hist2, _ = np.histogram(lbp2.ravel(), bins=np.arange(0, 27), range=(0, 26))
    hist1 = hist1.astype('float32')
    hist2 = hist2.astype('float32')
    return cosine_similarity([hist1], [hist2])[0][0]

In [None]:
def process_image_batch(batch: List[np.ndarray], reference_image: np.ndarray, model: PCA):
    """Process a batch of images and compute similarity metrics."""
    results = []
    reference_features = extract_features(reference_image, model)
    
    for i, image in enumerate(batch):
        print(f"Processing image {i+1}/{len(batch)}")  # Debug statement
        color_sim = color_similarity(reference_image, image)
        deep_sim = cosine_similarity(reference_features, extract_features(image, model))[0][0]
        custom_sim = custom_similarity(reference_image, image)
        results.append({
            'color_similarity': color_sim,
            'deep_learning_similarity': deep_sim,
            'custom_similarity': custom_sim
        })
    
    # Display results for the first few images in the batch
    for i, result in enumerate(results[:5]):
        print(f"Image {i}:")
        print(f"Color Similarity: {result['color_similarity']:.4f}")
        print(f"Deep Learning Similarity: {result['deep_learning_similarity']:.4f}")
        print(f"Custom Similarity: {result['custom_similarity']:.4f}")
        print("-" * 30)
    
    return results

In [None]:
def deep_learning_similarity(image1: np.ndarray, image2: np.ndarray, model: PCA) -> float:
    """Compute similarity using deep learning features."""
    feature1 = extract_features(image1, model)
    feature2 = extract_features(image2, model)
    return cosine_similarity(feature1, feature2)[0][0]

In [None]:
pca_model = PCA()
def find_best_matches(input_image: np.ndarray, all_images: List[np.ndarray], num_matches: int = 5) -> List[Tuple[float, int]]:
    """Find the best matches for the input image from all images."""
    similarities = []
    for idx, image in enumerate(all_images):
        score = (
            color_similarity(input_image, image) +
            deep_learning_similarity(input_image, image, pca_model) +
            custom_similarity(input_image, image)
        )
        similarities.append((score, idx))
    similarities.sort(reverse=True, key=lambda x: x[0])
    return similarities[:num_matches]