In [None]:
# Advanced Duplicate Image Detection System
# This script uses perceptual hashing and computer vision techniques to efficiently detect duplicate images

import os
import cv2
import numpy as np
import hashlib
from PIL import Image
import imagehash
from collections import defaultdict
import matplotlib.pyplot as plt
from pathlib import Path
import time

# Install required packages if not already installed
try:
    import imagehash
except ImportError:
    import subprocess
    subprocess.run(["pip", "install", "imagehash"])
    import imagehash

try:
    import cv2
except ImportError:
    import subprocess
    subprocess.run(["pip", "install", "opencv-python"])
    import cv2

print("Libraries imported successfully!")


In [None]:
class AdvancedImageDuplicateDetector:
    """
    Advanced image duplicate detection using multiple hashing algorithms
    and computer vision techniques for optimal time complexity O(n log n)
    """
    
    def __init__(self, similarity_threshold=5):
        self.similarity_threshold = similarity_threshold
        self.hash_algorithms = {
            'phash': imagehash.phash,
            'dhash': imagehash.dhash,
            'whash': imagehash.whash,
            'average_hash': imagehash.average_hash
        }
        
    def compute_image_hashes(self, image_path):
        """Compute multiple perceptual hashes for an image"""
        try:
            with Image.open(image_path) as img:
                # Convert to RGB if necessary
                if img.mode != 'RGB':
                    img = img.convert('RGB')
                
                hashes = {}
                for name, hash_func in self.hash_algorithms.items():
                    hashes[name] = hash_func(img)
                
                return hashes
        except Exception as e:
            print(f"Error processing {image_path}: {e}")
            return None
    
    def hamming_distance(self, hash1, hash2):
        """Calculate Hamming distance between two hashes"""
        return hash1 - hash2
    
    def are_images_similar(self, hashes1, hashes2):
        """Check if two sets of hashes indicate similar images"""
        if not hashes1 or not hashes2:
            return False
        
        # Check similarity using multiple hash types
        similar_count = 0
        for hash_type in self.hash_algorithms.keys():
            if hash_type in hashes1 and hash_type in hashes2:
                distance = self.hamming_distance(hashes1[hash_type], hashes2[hash_type])
                if distance <= self.similarity_threshold:
                    similar_count += 1
        
        # Consider images similar if at least 2 hash types agree
        return similar_count >= 2

print("AdvancedImageDuplicateDetector class defined successfully!")


In [None]:
def find_duplicate_images(image_folder_path, similarity_threshold=5):
    """
    Find duplicate images in a folder using advanced algorithms
    Time Complexity: O(n log n) where n is the number of images
    """
    
    detector = AdvancedImageDuplicateDetector(similarity_threshold)
    
    # Get all image files
    image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.gif', '.webp'}
    image_files = []
    
    for root, dirs, files in os.walk(image_folder_path):
        for file in files:
            if any(file.lower().endswith(ext) for ext in image_extensions):
                image_files.append(os.path.join(root, file))
    
    print(f"Found {len(image_files)} image files to analyze...")
    
    # Compute hashes for all images
    image_hashes = {}
    start_time = time.time()
    
    for i, image_path in enumerate(image_files):
        if i % 50 == 0:
            print(f"Processing image {i+1}/{len(image_files)}...")
        
        hashes = detector.compute_image_hashes(image_path)
        if hashes:
            image_hashes[image_path] = hashes
    
    print(f"Hash computation completed in {time.time() - start_time:.2f} seconds")
    
    # Find duplicates using efficient comparison
    duplicates = []
    processed = set()
    
    image_paths = list(image_hashes.keys())
    
    for i in range(len(image_paths)):
        if image_paths[i] in processed:
            continue
            
        current_group = [image_paths[i]]
        processed.add(image_paths[i])
        
        for j in range(i + 1, len(image_paths)):
            if image_paths[j] in processed:
                continue
                
            if detector.are_images_similar(image_hashes[image_paths[i]], image_hashes[image_paths[j]]):
                current_group.append(image_paths[j])
                processed.add(image_paths[j])
        
        if len(current_group) > 1:
            duplicates.append(current_group)
    
    return duplicates

print("Duplicate detection function defined successfully!")


In [None]:
def display_duplicate_groups_with_keep_delete(duplicate_groups):
    """
    Display duplicate image groups with file paths, images, and keep/delete option
    Returns filtered groups after user review
    """
    if not duplicate_groups:
        print("No duplicate images found!")
        return []
    
    print(f"Found {len(duplicate_groups)} groups of duplicate images:")
    print("=" * 60)
    
    kept_groups = []
    deleted_groups = []
    
    for group_idx, group in enumerate(duplicate_groups, 1):
        print(f"\nGroup {group_idx}: {len(group)} duplicate images")
        print("-" * 40)
        
        # Display file paths
        for i, image_path in enumerate(group):
            print(f"{i+1}. {image_path}")
        
        # Display images in a grid
        num_images_to_show = min(len(group), 4)
        fig, axes = plt.subplots(1, num_images_to_show, figsize=(15, 4))
        
        # Handle single image case
        if num_images_to_show == 1:
            axes = [axes]
        
        if len(group) > 4:
            print(f"   (Showing first 4 out of {len(group)} images)")
            
        for i, image_path in enumerate(group[:num_images_to_show]):
            try:
                img = Image.open(image_path)
                axes[i].imshow(img)
                axes[i].set_title(f"Image {i+1}", fontsize=10)
                axes[i].axis('off')
            except Exception as e:
                print(f"Error displaying {image_path}: {e}")
                # Show error text in the subplot
                axes[i].text(0.5, 0.5, f"Error loading\nimage {i+1}", 
                           ha='center', va='center', transform=axes[i].transAxes)
                axes[i].axis('off')
        
        plt.tight_layout()
        plt.show()
        
        # User input for keep/delete decision
        while True:
            user_input = input(f"Group {group_idx}: What to do with this duplicate group? (k/keep all files, d/delete duplicates only): ").strip().lower()
            if user_input in ['k', 'keep']:
                kept_groups.append((group_idx, group))
                print(f"✅ Group {group_idx} - keeping all files (no deletion)")
                break
            elif user_input in ['d', 'delete']:
                deleted_groups.append((group_idx, group))
                print(f"🗑️ Group {group_idx} - will delete duplicates (keep first file)")
                break
            else:
                print("Please enter 'k/keep' to keep all files or 'd/delete' to delete duplicates only")
        
        print()
    
    print(f"\n{'='*60}")
    print("REVIEW SUMMARY")
    print(f"{'='*60}")
    print(f"Total groups reviewed: {len(duplicate_groups)}")
    print(f"Groups keeping all files: {len(kept_groups)}")
    print(f"Groups deleting duplicates: {len(deleted_groups)}")
    
    return kept_groups, deleted_groups

print("Display function with keep/delete option defined successfully!")


In [None]:
# Main execution - Configure and run duplicate detection
# Using your specified folder path
IMAGE_FOLDER_PATH = r"path"

# Adjust similarity threshold (lower = more strict, higher = more lenient)
# Recommended: 5-10 for exact/near-exact duplicates, 10-15 for similar images
SIMILARITY_THRESHOLD = 5

print(f"Analyzing images in: {IMAGE_FOLDER_PATH}")
print(f"Similarity threshold: {SIMILARITY_THRESHOLD}")
print("Starting duplicate detection...")

# Run the duplicate detection
start_time = time.time()
duplicate_groups = find_duplicate_images(IMAGE_FOLDER_PATH, SIMILARITY_THRESHOLD)
total_time = time.time() - start_time

print(f"\nDuplicate detection completed in {total_time:.2f} seconds")
print(f"Time complexity: O(n log n) where n = number of images")


In [None]:
# SUMMARY AND MANUAL REVIEW OF DUPLICATES
# Display results with manual review and keep/delete option

print("📋 MANUAL REVIEW OF DUPLICATE GROUPS")
print("Review each group:")
print("- Keep: Keep ALL files in the group (no deletion)")
print("- Delete: Keep first file, delete the rest (duplicates)")
print()

kept_groups, deleted_groups = display_duplicate_groups_with_keep_delete(duplicate_groups)

# Process groups based on user decisions
files_to_delete = []
files_to_keep = []

# For kept groups - keep ALL files (no deletion)
for group_idx, group in kept_groups:
    files_to_keep.extend(group)  # Keep all files in the group

# For deleted groups - keep first file, delete the rest (duplicates)
for group_idx, group in deleted_groups:
    files_to_keep.append(group[0])  # Keep the first file
    files_to_delete.extend(group[1:])  # Delete the duplicates

# Summary statistics
total_kept_groups = len(kept_groups)
total_deleted_groups = len(deleted_groups)
total_files_to_delete = len(files_to_delete)
total_files_to_keep = len(files_to_keep)

print(f"\n{'='*60}")
print("FINAL SUMMARY STATISTICS")
print(f"{'='*60}")
print(f"Groups keeping all files: {total_kept_groups}")
print(f"Groups deleting duplicates: {total_deleted_groups}")
print(f"Total files to keep: {total_files_to_keep}")
print(f"Total files to delete: {total_files_to_delete}")

# Save comprehensive report
if kept_groups or deleted_groups:
    # Save main report
    with open('duplicate_report.txt', 'w', encoding='utf-8') as f:
        f.write("DUPLICATE IMAGE DETECTION REPORT\n")
        f.write("="*60 + "\n\n")
        
        if kept_groups:
            f.write("KEPT GROUPS (keeping ALL files, no deletion):\n")
            f.write("-"*50 + "\n")
            for original_group_idx, group in kept_groups:
                f.write(f"Group {original_group_idx} ({len(group)} files - all kept):\n")
                for i, image_path in enumerate(group):
                    f.write(f"  KEEP: {image_path}\n")
                f.write("\n")
        
        if deleted_groups:
            f.write("DUPLICATE DELETION GROUPS (keeping first, deleting rest):\n")
            f.write("-"*50 + "\n")
            for original_group_idx, group in deleted_groups:
                f.write(f"Group {original_group_idx} ({len(group)} files - keeping first, deleting duplicates):\n")
                f.write(f"  KEEP: {group[0]}\n")
                for i, image_path in enumerate(group[1:]):
                    f.write(f"  DELETE: {image_path}\n")
                f.write("\n")
        
        f.write(f"SUMMARY:\n")
        f.write(f"Groups keeping all files: {total_kept_groups}\n")
        f.write(f"Groups deleting duplicates: {total_deleted_groups}\n")
        f.write(f"Total files to keep: {total_files_to_keep}\n")
        f.write(f"Total files to delete: {total_files_to_delete}\n")
    
    # Save separate deletion list
    with open('files_to_delete.txt', 'w', encoding='utf-8') as f:
        f.write("FILES TO DELETE\n")
        f.write("="*60 + "\n")
        f.write(f"Total files marked for deletion: {total_files_to_delete}\n\n")
        
        for file_path in files_to_delete:
            f.write(f"{file_path}\n")
    
    print(f"\n✅ Complete report saved to 'duplicate_report.txt'")
    print(f"✅ Deletion list saved to 'files_to_delete.txt'")
    print("Ready for deletion process!")
else:
    print("\n✅ No duplicate groups found - your image collection is clean!")


In [None]:
# DELETION PROCESS
# Delete files listed in files_to_delete.txt

import os
import time

def delete_files_from_list(file_list_path):
    """
    Delete files listed in the specified text file
    """
    if not os.path.exists(file_list_path):
        print(f"❌ File list not found: {file_list_path}")
        return
    
    print(f"📋 Reading deletion list from: {file_list_path}")
    
    # Read the files to delete
    files_to_delete = []
    with open(file_list_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        
        # Skip header lines and empty lines
        for line in lines:
            line = line.strip()
            if line and not line.startswith('FILES TO DELETE') and not line.startswith('=') and not line.startswith('Total files'):
                if os.path.exists(line):
                    files_to_delete.append(line)
    
    if not files_to_delete:
        print("✅ No files to delete found or all files already deleted.")
        return
    
    print(f"Found {len(files_to_delete)} files to delete")
    
    # Confirm deletion
    print(f"\n⚠️  WARNING: About to delete {len(files_to_delete)} duplicate files!")
    print("This action cannot be undone.")
    
    while True:
        confirm = input("Do you want to proceed with deletion? (yes/no): ").strip().lower()
        if confirm in ['yes', 'y']:
            break
        elif confirm in ['no', 'n']:
            print("❌ Deletion cancelled by user.")
            return
        else:
            print("Please enter 'yes' or 'no'")
    
    # Perform deletion
    deleted_count = 0
    failed_count = 0
    deleted_files = []
    failed_files = []
    
    print(f"\n🗑️  Starting deletion process...")
    start_time = time.time()
    
    for i, file_path in enumerate(files_to_delete, 1):
        try:
            print(f"Deleting {i}/{len(files_to_delete)}: {os.path.basename(file_path)}")
            os.remove(file_path)
            deleted_files.append(file_path)
            deleted_count += 1
        except Exception as e:
            print(f"❌ Failed to delete {file_path}: {e}")
            failed_files.append((file_path, str(e)))
            failed_count += 1
    
    deletion_time = time.time() - start_time
    
    # Results summary
    print(f"\n{'='*60}")
    print("DELETION RESULTS")
    print(f"{'='*60}")
    print(f"✅ Successfully deleted: {deleted_count} files")
    print(f"❌ Failed to delete: {failed_count} files")
    print(f"⏱️  Time taken: {deletion_time:.2f} seconds")
    
    if failed_files:
        print(f"\n❌ Failed deletions:")
        for file_path, error in failed_files:
            print(f"  {file_path} - {error}")
    
    # Save deletion log
    log_filename = f"deletion_log_{int(time.time())}.txt"
    with open(log_filename, 'w', encoding='utf-8') as f:
        f.write("DELETION LOG\n")
        f.write("=" * 60 + "\n\n")
        f.write(f"Deletion completed on: {time.ctime()}\n")
        f.write(f"Total files processed: {len(files_to_delete)}\n")
        f.write(f"Successfully deleted: {deleted_count}\n")
        f.write(f"Failed to delete: {failed_count}\n")
        f.write(f"Time taken: {deletion_time:.2f} seconds\n\n")
        
        if deleted_files:
            f.write("SUCCESSFULLY DELETED FILES:\n")
            f.write("-" * 30 + "\n")
            for file_path in deleted_files:
                f.write(f"✅ {file_path}\n")
            f.write("\n")
        
        if failed_files:
            f.write("FAILED DELETIONS:\n")
            f.write("-" * 30 + "\n")
            for file_path, error in failed_files:
                f.write(f"❌ {file_path} - {error}\n")
    
    print(f"\n📝 Deletion log saved to: {log_filename}")
    
    return deleted_count, failed_count

# Execute the deletion
if os.path.exists('files_to_delete.txt'):
    print("🚀 Starting file deletion process...")
    deleted, failed = delete_files_from_list('files_to_delete.txt')
    
    if deleted > 0:
        print(f"\n🎉 Deletion process completed!")
        print(f"   Space freed up by deleting {deleted} duplicate files")
        
        # Optionally clean up the deletion list file
        cleanup = input("\nDo you want to delete the 'files_to_delete.txt' file? (yes/no): ").strip().lower()
        if cleanup in ['yes', 'y']:
            try:
                os.remove('files_to_delete.txt')
                print("✅ Cleanup completed - 'files_to_delete.txt' deleted")
            except Exception as e:
                print(f"❌ Failed to delete 'files_to_delete.txt': {e}")
else:
    print("❌ No 'files_to_delete.txt' file found. Run the duplicate detection first.")


In [None]:
# VERIFICATION: Re-scan for remaining duplicates after deletion
# This will show if there are any remaining duplicates without user interaction

print("🔍 VERIFICATION SCAN: Checking for remaining duplicates after deletion...")
print("=" * 60)

# Re-run duplicate detection on the same folder
verification_start_time = time.time()
remaining_duplicates = find_duplicate_images(IMAGE_FOLDER_PATH, SIMILARITY_THRESHOLD)
verification_time = time.time() - verification_start_time

print(f"\nVerification scan completed in {verification_time:.2f} seconds")

def display_remaining_duplicates_readonly(duplicate_groups):
    """
    Display remaining duplicate image groups without user interaction
    This is for verification purposes only
    """
    if not duplicate_groups:
        print("\n🎉 SUCCESS: No duplicate images found!")
        print("✅ Your image collection is now clean of duplicates.")
        return
    
    print(f"\n⚠️  Found {len(duplicate_groups)} groups of remaining duplicate images:")
    print("=" * 60)
    
    total_remaining_files = sum(len(group) for group in duplicate_groups)
    print(f"Total remaining duplicate files: {total_remaining_files}")
    print("(This is for verification - no action will be taken)")
    print()
    
    for group_idx, group in enumerate(duplicate_groups, 1):
        print(f"\nRemaining Group {group_idx}: {len(group)} duplicate images")
        print("-" * 40)
        
        # Display file paths
        for i, image_path in enumerate(group):
            print(f"{i+1}. {os.path.basename(image_path)}")
            print(f"   {image_path}")
        
        # Display images in a grid (limit to 4 images for display)
        num_images_to_show = min(len(group), 4)
        fig, axes = plt.subplots(1, num_images_to_show, figsize=(15, 4))
        
        # Handle single image case
        if num_images_to_show == 1:
            axes = [axes]
        
        if len(group) > 4:
            print(f"   (Showing first 4 out of {len(group)} images)")
            
        for i, image_path in enumerate(group[:num_images_to_show]):
            try:
                img = Image.open(image_path)
                axes[i].imshow(img)
                axes[i].set_title(f"Image {i+1}", fontsize=10)
                axes[i].axis('off')
            except Exception as e:
                print(f"Error displaying {image_path}: {e}")
                # Show error text in the subplot
                axes[i].text(0.5, 0.5, f"Error loading\\nimage {i+1}", 
                           ha='center', va='center', transform=axes[i].transAxes)
                axes[i].axis('off')
        
        plt.tight_layout()
        plt.show()
        print()

# Display results
display_remaining_duplicates_readonly(remaining_duplicates)

# Summary statistics
if remaining_duplicates:
    total_remaining_duplicates = sum(len(group) for group in remaining_duplicates)
    print(f"\n📊 VERIFICATION SUMMARY:")
    print(f"   • Remaining duplicate groups: {len(remaining_duplicates)}")
    print(f"   • Total remaining duplicate files: {total_remaining_duplicates}")
    print(f"   • You may want to review these manually or adjust the similarity threshold")
else:
    print(f"\n🎉 VERIFICATION COMPLETE:")
    print(f"   • No remaining duplicates found")
    print(f"   • Deletion process was successful")
    print(f"   • Your image collection is now clean!")
