# Phase 0-1: Dataset Deduplication

Utilities for image deduplication.

In [None]:
# Core imports
import os
import hashlib
from pathlib import Path
from collections import defaultdict
from datetime import datetime
import shutil

from tqdm import tqdm  # progress bars

print(f"Imports OK. CWD: {Path().resolve()}")

In [None]:
# Paths are machine-specific. Adjust PROJECT_ROOT before running on a new machine.
PROJECT_ROOT = "/Users/tyreecruse/Desktop/CS230/Project/Data/Original"
# Alternative:
# PROJECT_ROOT = os.getcwd()

CONFIG = {
    # Dataset directory (input and output - modified in-place)
    "dataset_path": os.path.join(PROJECT_ROOT, "master_dataset_pool"),

    # Backup settings
    "create_backup": True,
    "backup_dir": os.path.join(PROJECT_ROOT, "dataset_backups"),

    # Processing parameters
    "chunk_size": 64 * 1024,  # bytes
    "remove_labels": True,    # remove labels for duplicate images
}

print("Config")
print("------")
for key in ("dataset_path", "create_backup", "backup_dir", "chunk_size"):
    print(f"{key:>14}: {CONFIG[key]}")

# Quick sanity check on the dataset
dataset_path = Path(CONFIG["dataset_path"])
images_dir = dataset_path / "images"

if not images_dir.exists():
    print(f"\n[warning] images/ not found under {dataset_path}")
    print("          Run consolidation or adjust CONFIG['dataset_path'].")
else:
    n_images = sum(1 for p in images_dir.iterdir() if p.is_file())
    print(f"\nFound {n_images:,} image files under {images_dir}")

In [None]:
def get_file_hash(path, chunk_size=65536):
    """Return MD5 hex digest of the file at `path`."""
    h = hashlib.md5()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(chunk_size), b""):
            h.update(chunk)
    return h.hexdigest()

In [None]:
def create_backup(dataset_path, backup_dir):
    """Copy dataset_path into backup_dir/<name>_backup_<timestamp> and return the new path."""
    dataset_path = Path(dataset_path)
    backup_dir = Path(backup_dir)
    backup_dir.mkdir(parents=True, exist_ok=True)

    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_path = backup_dir / f"{dataset_path.name}_backup_{ts}"

    print(f"\nCreating backup at {backup_path}")
    shutil.copytree(dataset_path, backup_path)

    return backup_path

In [None]:
def deduplicate_dataset(dataset_dir, config):
    """Deduplicate images (and optionally labels) in-place; return a stats dict."""
    dataset_dir = Path(dataset_dir)
    images_dir = dataset_dir / "images"
    labels_dir = dataset_dir / "labels"

    if not images_dir.is_dir():
        raise FileNotFoundError(f"images/ directory not found: {images_dir}")

    image_files = [p for p in images_dir.iterdir() if p.is_file()]

    stats = {
        "total_images": len(image_files),
        "duplicates_removed": 0,
        "labels_removed": 0,
        "unique_images": 0,
        "duplicate_groups": 0,
    }

    if not image_files:
        print(f"No files found under {images_dir}")
        return stats

    print(f"\nScanning {images_dir} for duplicates "
          f"({stats['total_images']:,} files)")

    # Build hash map -> list of files with that hash
    hash_to_files = defaultdict(list)
    for img_path in tqdm(image_files, desc="hash", unit="file"):
        try:
            h = get_file_hash(img_path, config["chunk_size"])
        except Exception as exc:
            print(f"[hash] skip {img_path.name}: {exc}")
            continue
        hash_to_files[h].append(img_path)

    # Only keep entries that actually have duplicates
    duplicate_groups = [paths for paths in hash_to_files.values()
                        if len(paths) > 1]
    stats["duplicate_groups"] = len(duplicate_groups)

    if not duplicate_groups:
        print("No duplicate hashes found.")
        stats["unique_images"] = stats["total_images"]
        return stats

    remove_labels = config.get("remove_labels", True) and labels_dir.is_dir()

    # Remove everything except the first file in each duplicate group
    for paths in tqdm(duplicate_groups, desc="remove", unit="group"):
        for dup in paths[1:]:
            try:
                dup.unlink()
                stats["duplicates_removed"] += 1
            except Exception as exc:
                print(f"[rm] could not remove {dup.name}: {exc}")
                continue

            if remove_labels:
                label_path = labels_dir / f"{dup.stem}.txt"
                if label_path.exists():
                    try:
                        label_path.unlink()
                        stats["labels_removed"] += 1
                    except Exception as exc:
                        print(f"[rm] could not remove label {label_path.name}: {exc}")

    stats["unique_images"] = stats["total_images"] - stats["duplicates_removed"]
    return stats

In [None]:
def main():
    """Run optional backup and deduplication, returning a results dict."""
    results = {}
    dataset_dir = Path(CONFIG["dataset_path"])

    try:
        # Optional backup
        if CONFIG.get("create_backup", False):
            backup_path = create_backup(dataset_dir, CONFIG["backup_dir"])
            results["backup_path"] = str(backup_path)

        # Deduplication
        stats = deduplicate_dataset(dataset_dir, CONFIG)
        results["deduplication"] = stats

        # Compact summary
        print("\nDeduplication summary")
        print("-" * 30)
        print(f"  original images : {stats['total_images']:,}")
        print(f"  removed images  : {stats['duplicates_removed']:,}")
        print(f"  removed labels  : {stats['labels_removed']:,}")
        print(f"  unique images   : {stats['unique_images']:,}")

        if stats["total_images"]:
            rate = 100 * stats["duplicates_removed"] / stats["total_images"]
            print(f"  deduplication   : {rate:.1f}%")

        print(f"\nData location     : {dataset_dir}")
        if "backup_path" in results:
            print(f"Backup            : {results['backup_path']}")

    except Exception as exc:
        print(f"\nDeduplication failed: {exc}")
        results["error"] = str(exc)

    return results


if __name__ == "__main__":
    deduplication_results = main()

In [None]:
dataset_path = Path(CONFIG["dataset_path"])
images_dir = dataset_path / "images"
labels_dir = dataset_path / "labels"

print("\nDataset check")
print("-------------")

if images_dir.is_dir():
    image_files = [p for p in images_dir.iterdir() if p.is_file()]
    label_files = list(labels_dir.glob("*.txt")) if labels_dir.is_dir() else []

    n_img, n_lbl = len(image_files), len(label_files)
    print(f"  images : {n_img:,}")
    print(f"  labels : {n_lbl:,}")
    if n_img:
        cov = 100 * n_lbl / n_img
        print(f"  label/image: {cov:.1f}%")

    if image_files:
        print("\n  sample files:")
        for p in image_files[:5]:
            print("   -", p.name)
        if n_img > 5:
            print(f"   ... (+{n_img - 5:,} more)")
else:
    print(f"  images directory not found at {images_dir}")

# Tie back to the run stats, if available
if 'deduplication_results' in globals() and isinstance(deduplication_results, dict):
    stats = deduplication_results['deduplication']
    print("\nRun summary")
    print("-----------")
    print(f"  removed images : {stats['duplicates_removed']:,}")
    print(f"  removed labels : {stats['labels_removed']:,}")
    print(f"  unique images  : {stats['unique_images']:,}")