# Deduplication Pipeline
This notebook demonstrates an image deduplication workflow using multiple stages:
1. MD5 byte comparison
2. Perceptual hashing (pHash and dHash)
3. CNN based similarity

The results of each stage are merged and duplicate images are moved to review folders.

In [None]:
import tensorflow as tf, numpy as np, google.protobuf as pb
print('TF :', tf.__version__)
print('NP :', np.__version__)
print('PB :', pb.__version__)
print('GPUs:', tf.config.list_physical_devices('GPU'))

In [None]:
from pathlib import Path
import hashlib, json, shutil
from collections import defaultdict
from tqdm import tqdm
from PIL import Image

from imagededup.methods import PHash, DHash, CNN
from imagededup.utils import plot_duplicates

In [None]:
# Configure directories
IMAGE_DIR = Path('E:/DeepLearning/Data/Mika-Pikazo_Full')
WORK_DIR = IMAGE_DIR / '_dedup_meta'
REVIEW_DIR = IMAGE_DIR / '_duplicates_to_review'

for d in (WORK_DIR, REVIEW_DIR):
    d.mkdir(parents=True, exist_ok=True)

print('IMAGE_DIR :', IMAGE_DIR.resolve())
print('WORK_DIR  :', WORK_DIR.resolve())
print('REVIEW_DIR:', REVIEW_DIR.resolve())

In [None]:
def md5sum(fp, blocksize=1 << 16):
    h = hashlib.md5()
    with open(fp, 'rb') as f:
        for blk in iter(lambda: f.read(blocksize), b''):
            h.update(blk)
    return h.hexdigest()

def merge_duplicate_maps(*maps):
    merged = defaultdict(set)
    for m in maps:
        for k, vals in m.items():
            for v in vals:
                merged[k].add(v)
                merged[v].add(k)
    return {k: sorted(vs - {k}) for k, vs in merged.items() if vs}


In [None]:
# Stage 0: exact file matches using MD5
md5_dup_map = defaultdict(list)
hash_buckets = defaultdict(list)

for img in tqdm(IMAGE_DIR.glob('*')):
    if img.is_file():
        digest = md5sum(img)
        hash_buckets[digest].append(img.name)

for file_list in hash_buckets.values():
    if len(file_list) > 1:
        for f in file_list:
            md5_dup_map[f] = [x for x in file_list if x != f]

byte_dups = hash_buckets
json.dump(md5_dup_map, open(WORK_DIR/'stage0_md5_filename_keys.json', 'w'), indent=2)

In [None]:
# Stage 1: perceptual hashing
phasher, dhasher = PHash(), DHash()

ph_enc = phasher.encode_images(image_dir=IMAGE_DIR)
dh_enc = dhasher.encode_images(image_dir=IMAGE_DIR)

ph_dups = phasher.find_duplicates(encoding_map=ph_enc, max_distance_threshold=2)
dh_dups = dhasher.find_duplicates(encoding_map=dh_enc, max_distance_threshold=2)

hash_dups = merge_duplicate_maps(ph_dups, dh_dups)
json.dump(hash_dups, open(WORK_DIR/'stage1_hash_dups.json', 'w'), indent=2)
print(f'Stage-1 produced {len(hash_dups)} duplicate clusters.')

In [None]:
# Stage 2: CNN similarity search
cnn = CNN()
all_cnn_enc = cnn.encode_images(image_dir=IMAGE_DIR)
cnn_dups = cnn.find_duplicates(
    encoding_map=all_cnn_enc,
    min_similarity_threshold=0.95,
    outfile=str(WORK_DIR/'stage2_cnn_dups.json')
)
print(f'CNN pass produced {len(cnn_dups)} clusters (whole dir).')

In [None]:
# Merge results from all stages
def merge_maps_union(*maps):
    merged = {}
    for m in maps:
        for k, vals in m.items():
            merged.setdefault(k, set()).update(vals)
    return {k: sorted(v - {k}) for k, v in merged.items() if v}

merged_dups = merge_maps_union(md5_dup_map, hash_dups, cnn_dups)
print(f'Union map clusters: {len(merged_dups)}')

In [None]:
# Convert duplicate map to cluster list
def dict_to_clusters(dup_map):
    clusters, seen = [], set()
    for k, v in dup_map.items():
        if k in seen:
            continue
        c = {k, *v}
        clusters.append(sorted(c))
        seen.update(c)
    return clusters

clusters = dict_to_clusters(merged_dups)
print(f'Total clusters to review: {len(clusters)}')

In [None]:
# Preview a few clusters
dup_map = merged_dups
visited = set()
shown = 0
MAX_SHOWN = 5

print('
=== DUPLICATE GROUPS ===')
for root, group in dup_map.items():
    if root in visited:
        continue
    plot_duplicates(str(IMAGE_DIR), dup_map, filename=root)
    print(f'Cluster size = {len(group) + 1}')
    visited.update(group)
    visited.add(root)
    shown += 1
    if shown >= MAX_SHOWN:
        break

In [None]:
# Move duplicates to review folders
def image_resolution(path: Path) -> int:
    with Image.open(path) as im:
        w, h = im.size
    return w * h

moved_files = []
for idx, cluster in enumerate(clusters, 1):
    live = [fn for fn in cluster if (IMAGE_DIR / fn).exists()]
    if not live:
        print(f'Cluster {idx:04d} already processed, skip.')
        continue

    best_img = max(
        live,
        key=lambda fn: (
            image_resolution(IMAGE_DIR / fn),
            (IMAGE_DIR / fn).stat().st_size,
        ),
    )

    group_dir = REVIEW_DIR / f'cluster_{idx:04d}'
    group_dir.mkdir(parents=True, exist_ok=True)

    for fn in live:
        src = IMAGE_DIR / fn
        dst = group_dir / fn
        if fn == best_img:
            shutil.copy2(src, dst)
        else:
            shutil.move(src, dst)
            moved_files.append(fn)

    print(f'Cluster {idx:04d}: kept {best_img}, moved {len(live)-1} dupes.')

print('All clusters processed; review folders ready.')

In [None]:
total_checked = len(list(IMAGE_DIR.glob('*'))) + len(moved_files)
print(f"""
======== SUMMARY ========
Images processed : {total_checked}
MD5 duplicates   : {len(byte_dups)}
Hash duplicates  : {len(hash_dups)}
CNN duplicates   : {len(cnn_dups)}
Files moved      : {len(moved_files)}
Meta saved to    : {WORK_DIR}
Dup moved to     : {REVIEW_DIR}
""")