In [1]:
!pip install imagehash

Collecting imagehash
  Downloading ImageHash-4.3.2-py2.py3-none-any.whl.metadata (8.4 kB)
Collecting PyWavelets (from imagehash)
  Downloading pywavelets-1.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.0 kB)
Downloading ImageHash-4.3.2-py2.py3-none-any.whl (296 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m296.7/296.7 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pywavelets-1.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.5/4.5 MB[0m [31m24.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: PyWavelets, imagehash
Successfully installed PyWavelets-1.8.0 imagehash-4.3.2


In [3]:
import os
from pathlib import Path
import numpy as np
from PIL import Image
import imagehash
from collections import defaultdict
import pandas as pd
import shutil

def check_and_rename_duplicate_images(paths, rename=True):
    """
    Check for duplicate images across train, valid, and test folders
    using both filename and image content comparison, and rename duplicates if specified.

    Args:
        paths (dict): Dictionary containing paths to train, valid, and test folders
        rename (bool): Whether to rename duplicate images or just report them

    Returns:
        tuple: (filename_duplicates, content_duplicates, renamed_images)
            - filename_duplicates: Dictionary of duplicate filenames
            - content_duplicates: Dictionary of duplicate images based on content
            - renamed_images: List of images that were renamed
    """
    # Store all filenames
    filename_map = defaultdict(list)
    # Store image hashes
    hash_map = defaultdict(list)
    # Track renamed images
    renamed_images = []

    # Process each split (train/valid/test)
    for split in ['train', 'valid', 'test']:
        if os.path.exists(paths[split]):
            # Walk through all subdirectories
            for class_name in os.listdir(paths[split]):
                class_dir = os.path.join(paths[split], class_name)
                if os.path.isdir(class_dir):
                    # Process each image
                    for img_name in os.listdir(class_dir):
                        if img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
                            # Store full path and split info for filename check
                            filename_map[img_name].append({
                                'path': os.path.join(class_dir, img_name),
                                'split': split,
                                'class': class_name
                            })

                            # Calculate image hash for content comparison
                            try:
                                img_path = os.path.join(class_dir, img_name)
                                with Image.open(img_path) as img:
                                    # Convert to RGB if necessary
                                    if img.mode != 'RGB':
                                        img = img.convert('RGB')
                                    # Calculate perceptual hash
                                    img_hash = str(imagehash.average_hash(img))
                                    hash_map[img_hash].append({
                                        'path': img_path,
                                        'split': split,
                                        'class': class_name,
                                        'filename': img_name
                                    })
                            except Exception as e:
                                print(f"Error processing {img_path}: {str(e)}")

    # Find duplicates by filename
    filename_duplicates = {
        filename: locations
        for filename, locations in filename_map.items()
        if len(locations) > 1
    }

    # Find duplicates by content
    content_duplicates = {
        hash_val: locations
        for hash_val, locations in hash_map.items()
        if len(locations) > 1
    }

    # Rename duplicates if requested
    if rename:
        # Rename filename duplicates
        for filename, locations in filename_duplicates.items():
            for i, loc in enumerate(locations[1:], 1):  # Skip the first one (original)
                old_path = loc['path']
                file_base, file_ext = os.path.splitext(filename)
                new_filename = f"{file_base}_tom{i}a{file_ext}"
                new_path = os.path.join(os.path.dirname(old_path), new_filename)

                # Rename file
                try:
                    shutil.move(old_path, new_path)
                    renamed_images.append({
                        'original_path': old_path,
                        'new_path': new_path,
                        'original_name': filename,
                        'new_name': new_filename,
                        'duplicate_type': 'filename'
                    })
                    print(f"Renamed: {old_path} -> {new_path}")
                except Exception as e:
                    print(f"Error renaming {old_path}: {str(e)}")

        # Rename content duplicates (that aren't already filename duplicates)
        processed_paths = set([item['original_path'] for item in renamed_images])

        for hash_val, locations in content_duplicates.items():
            for i, loc in enumerate(locations[1:], 1):  # Skip the first one (original)
                old_path = loc['path']
                # Skip if already renamed
                if old_path in processed_paths:
                    continue

                file_base, file_ext = os.path.splitext(loc['filename'])
                new_filename = f"{file_base}_dup{i}b{file_ext}"
                new_path = os.path.join(os.path.dirname(old_path), new_filename)

                # Rename file
                try:
                    shutil.move(old_path, new_path)
                    renamed_images.append({
                        'original_path': old_path,
                        'new_path': new_path,
                        'original_name': loc['filename'],
                        'new_name': new_filename,
                        'duplicate_type': 'content'
                    })
                    print(f"Renamed: {old_path} -> {new_path}")
                    processed_paths.add(old_path)
                except Exception as e:
                    print(f"Error renaming {old_path}: {str(e)}")

    return filename_duplicates, content_duplicates, renamed_images

def print_duplicate_summary(filename_duplicates, content_duplicates, renamed_images=None):
    """Print a summary of found duplicates and renamed images"""
    print("\n=== Duplicate Analysis Summary ===")

    print("\nDuplicates by filename:")
    if filename_duplicates:
        for filename, locations in filename_duplicates.items():
            print(f"\nFilename: {filename}")
            for loc in locations:
                print(f"- Found in {loc['split']}/{loc['class']}")
    else:
        print("No duplicate filenames found.")

    print("\nDuplicates by content:")
    if content_duplicates:
        for hash_val, locations in content_duplicates.items():
            print(f"\nHash: {hash_val}")
            for loc in locations:
                print(f"- {loc['filename']} in {loc['split']}/{loc['class']}")
    else:
        print("No duplicate content found.")

    if renamed_images:
        print("\nRenamed Images:")
        print(f"Total renamed: {len(renamed_images)}")
        for item in renamed_images[:5]:  # Show first 5 as example
            print(f"- {item['original_name']} -> {item['new_name']} ({item['duplicate_type']} duplicate)")
        if len(renamed_images) > 5:
            print(f"... and {len(renamed_images) - 5} more")

def generate_duplicate_report(filename_duplicates, content_duplicates, renamed_images=None):
    """Generate pandas DataFrames for detailed duplicate analysis"""
    # Prepare data for filename duplicates
    filename_data = []
    for filename, locations in filename_duplicates.items():
        for loc in locations:
            filename_data.append({
                'filename': filename,
                'split': loc['split'],
                'class': loc['class'],
                'full_path': loc['path']
            })

    # Prepare data for content duplicates
    content_data = []
    for hash_val, locations in content_duplicates.items():
        for loc in locations:
            content_data.append({
                'hash': hash_val,
                'filename': loc['filename'],
                'split': loc['split'],
                'class': loc['class'],
                'full_path': loc['path']
            })

    # Create DataFrames
    filename_df = pd.DataFrame(filename_data) if filename_data else pd.DataFrame()
    content_df = pd.DataFrame(content_data) if content_data else pd.DataFrame()
    renamed_df = pd.DataFrame(renamed_images) if renamed_images else pd.DataFrame()

    return filename_df, content_df, renamed_df

def main(rename_duplicates=True):
    # Define base directory and paths
    base_dir = Path('/content/drive/MyDrive/SeniorProject/Tomato/RipenessClassification_Sorted')
    paths = {
        'base': base_dir,
        'valid': os.path.join(str(base_dir), 'valid'),
        'train': os.path.join(str(base_dir), 'train'),
        'test': os.path.join(str(base_dir), 'test')
    }

    # Check for duplicates and rename if requested
    print(f"Checking for duplicate images{' and renaming them' if rename_duplicates else ''}...")
    filename_duplicates, content_duplicates, renamed_images = check_and_rename_duplicate_images(
        paths, rename=rename_duplicates
    )

    # Print summary
    print_duplicate_summary(filename_duplicates, content_duplicates, renamed_images)

    # Generate detailed report
    filename_df, content_df, renamed_df = generate_duplicate_report(
        filename_duplicates, content_duplicates, renamed_images
    )

    # Save reports if duplicates were found
    if not filename_df.empty:
        filename_df.to_csv('duplicate_filenames_report.csv', index=False)
        print("\nDuplicate filenames report saved to 'duplicate_filenames_report.csv'")

    if not content_df.empty:
        content_df.to_csv('duplicate_content_report.csv', index=False)
        print("\nDuplicate content report saved to 'duplicate_content_report.csv'")

    if not renamed_df.empty:
        renamed_df.to_csv('renamed_images_report.csv', index=False)
        print("\nRenamed images report saved to 'renamed_images_report.csv'")

    return filename_df, content_df, renamed_df

if __name__ == "__main__":
    # Set rename_duplicates=False if you only want to detect without renaming
    filename_df, content_df, renamed_df = main(rename_duplicates=True)


Checking for duplicate images and renaming them...

=== Duplicate Analysis Summary ===

Duplicates by filename:
No duplicate filenames found.

Duplicates by content:
No duplicate content found.
