# <ins>Remove Duplicates From Specified Directories</ins>

### <ins>1. Imports</ins>

In [5]:
import os
import numpy as np
import imghdr
from datetime import datetime
from sentence_transformers import SentenceTransformer, util
from sklearn.metrics.pairwise import cosine_similarity as cosine
import cv2
from PIL import Image
import urllib
import tkinter as tk
from tkinter import filedialog
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
from matplotlib.widgets import Button
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

In [6]:
#Plots in external window
%matplotlib qt

import warnings
with warnings.catch_warnings():
    pass

### <ins>2. Functions used by the tool</ins>

In [7]:
def getImagePaths(directory: str):
    """
    Parameters
    ----------
    directory : directory in which to search for images
    Returns
    -------
    List of full filepaths to every image in specified directory 
    """
    return [os.path.join(directory, file) for file in os.listdir(directory) if imghdr.what(os.path.join(directory, file))] 

def encodeImages(filepaths: list, model):
    """
    Parameters
    ----------
    filepaths : full filepaths to a list of images
    model: Sentence Transformer model 
    Returns
    -------
    A matrix of embeddings which has size NxM, where N is the number of files for which embeddings have been computed, and M the embedding dimensionality
    """    
    return model.encode([Image.open(path) for path in filepaths], batch_size=128, convert_to_tensor=True, show_progress_bar=True)
     
def load_model():
    """
    Returns
    -------
    The Sentence Transformer model
    """        
    print('Loading CLIP Model...')        
    return SentenceTransformer('clip-ViT-B-32')


def cosineDistance(firstEnc, secondEnc, chunksize: int = 10):
    """
    Parameters
    ----------
    firstEnc, secondEnc : Matrices for the two directories in which duplicates are being removed
    chunksize: Set to a default of 10, this determines the number of embeddings which are being compared at any given point in the computation 
    Returns
    -------
    A matrix which the pairwise similarities between every image in firstEnc and secondEnc
    """     
    N, M = firstEnc.shape[0], secondEnc.shape[0]
    simMatrix = np.zeros((N, M))
    cumN = 0
    for nidx, nchunk in enumerate(np.array_split(firstEnc, N//chunksize + 1)):
        cumM = 0
        for midx, mchunk in enumerate(np.array_split(secondEnc, M//chunksize + 1)):
            n, m = nchunk.shape[0], mchunk.shape[0] 
            simMatrix[cumN: cumN + n, cumM:cumM + m] = cosine(nchunk, mchunk)
            cumM += m
        cumN += n    
    return simMatrix

def computeDuplicatesOneDirectory(array, threshold):
    """
    Parameters
    ----------
    array : Matrix of pairwise similarities 
    threshold: Minimum threshold similarity above which two images are considered duplicates
    oneDir: if there's only a single parent directory, can reduce computation time by skipping further pairwise comparisons between already identified duplicates
    Returns
    -------
    A dictionary with the key being the index of an image and the values being the indices of the images which are its duplicates
    """      
    dupsSet = set()
    dupDict = {}
    
    for i in range(array.shape[0]):
        if i not in dupsSet:
            dupIdxs = i + 1 + np.where(array[i][i+1:]>threshold)[0]
            if len(list(dupIdxs))>0:
                dupsSet.update(list(dupIdxs))
                dupDict[i] = dict(zip(list(dupIdxs), array[i][dupIdxs]))
    return dupDict

def computeDuplicatesTwoDirectories(array, threshold):
    """
    Parameters
    ----------
    array : Matrix of pairwise similarities 
    threshold: Minimum threshold similarity above which two images are considered duplicates
    oneDir: if there's only a single parent directory, can reduce computation time by skipping further pairwise comparisons between already identified duplicates
    Returns
    -------
    A dictionary with the key being the index of an image and the values being the indices of the images which are its duplicates
    """
    dupsSet = set()
    dupDict = {}
    
    for i in range(array.shape[0]):
            dupIdxs = np.where(array[i]>threshold)[0]
            dupIdxSet = set(dupIdxs) - dupsSet
            if len(dupIdxSet)>0:
                dupDict[i] = dict(zip(list(dupIdxSet), array[i][list(dupIdxSet)]))
                dupsSet.update(dupIdxSet)
    return dupDict

def getDuplicatesTwoDirectories(firstDir: list, secondDir: list, threshold: int):
    """
    Parameters
    ----------
    firstDir/secondDir : Paths to first/second directories
    threshold: Minimum threshold similarity above which two images are considered duplicates
    Returns
    -------
    duplicates   - a dictionary with the key being the index of an image and the values being the indices of the images which are its duplicates
    baseImgPaths - a list of all filepaths in the first directory
    compImgPaths - a list of all filepaths in the second directory
    """
    model = load_model()
    baseImgPaths, compImgPaths = getImagePaths(firstDir), getImagePaths(secondDir)
    baseEncodings, compEncodings = encodeImages(baseImgPaths, model), encodeImages(compImgPaths, model)
    
    simMatrix = cosineDistance(baseEncodings, compEncodings)
    duplicates = computeDuplicatesTwoDirectories(simMatrix, threshold)
    return duplicates, baseImgPaths, compImgPaths

def getDuplicatesOneDirectory(firstDir, threshold):
    """
    Parameters
    ----------
    firstDir : Path to the directory
    threshold: Minimum threshold similarity above which two images are considered duplicates
    Returns
    -------
    duplicates   - a dictionary with the key being the index of an image and the values being the indices of the images which are its duplicates
    baseImgPaths - a list of all filepaths in the first directory
    """
    model = load_model()
    baseImgPaths = getImagePaths(firstDir)
    baseEncodings = encodeImages(baseImgPaths, model)
      
    simMatrix = cosineDistance(baseEncodings, baseEncodings)
    duplicates = computeDuplicatesOneDirectory(simMatrix, threshold)
    return duplicates, baseImgPaths, baseImgPaths       

In [8]:
class SelectImage:    
    def __init__(self, viz, fig, ax, axIdx):
        self.viz, self.fig, self.ax, self.axIdx = viz, fig, ax, axIdx
        self.connect()
    
    def connect(self):
        """Connect to the press event created."""
        self.cidpress = self.fig.canvas.mpl_connect('button_press_event', self.on_press)
    
    def on_press(self, event):
        """
        Check whether mouse is over the subplot, and if so, change the selected flag
        Parameters
        ----------
        event : Automatically passed in the callback function
        Returns
        -------
        """
        if event.inaxes != self.ax:
            return
        
        contains, attrd = self.ax.contains(event)
        if not contains:
            return
           
        self.img = self.viz.currImgs[(self.viz.j + self.axIdx) % len(self.viz.currIdxs)]
        
        currIdxSelected = self.viz.currIdxsSelected[(self.viz.j + self.axIdx)  % len(self.viz.currIdxs)]
        if not currIdxSelected:
            self.ax.imshow(cv2.copyMakeBorder(self.img, 20, 20, 20, 20, cv2.BORDER_CONSTANT, value=(255, 0, 0)))
            self.fig.canvas.draw_idle()
            self.viz.currIdxsSelected[(self.viz.j + self.axIdx)  % len(self.viz.currIdxs)] = True                         
        else:
            self.ax.imshow(cv2.copyMakeBorder(self.img, 20, 20, 20, 20, cv2.BORDER_CONSTANT, value=(255, 255, 255)))
            self.fig.canvas.draw_idle()
            self.viz.currIdxsSelected[(self.viz.j + self.axIdx)  % len(self.viz.currIdxs)] = False 
            
    def disconnect(self):
        """Disconnect the press event"""
        self.fig.canvas.mpl_disconnect(self.cidpress)

In [9]:
class Visualise:
    def __init__(self, dups, firstDirPaths, secondDirPaths, i, j):  
        self.dups, self.firstDirPaths, self.secondDirPaths, self.i, self.j = dups, firstDirPaths, secondDirPaths, i, j
        self.numKeys = len(self.dups.keys())
        '''
        Example dups dictionary: 
            {
             1: {2: 1.0000001192092896, 4: 0.9872920513153076, 5: 0.9872920513153076},
             7: {8: 1.0000001192092896, 9: 0.9872920513153076}
            }
        
            i - Key index of element of dups dictionary currently plotted
            j - Index of first of two images currently plotted
        
            e.g. i=0, j=1 would correspond to images with outer key 1 and inner key 4
        '''

        #Plot axes
        self.fig, self.axes = plt.subplots(1, 2, figsize=(20,20))
        _, _ = self.axes[0].axis("off"), self.axes[1].axis("off")
        _ = self.getCurrFiles(self.i)

        self.plotImgs(self.j, self.j + 1)
            
    def getIdx(self, key: int):
        """
        Parameters
        Key - The index of the position of the node in firstDir
        Returns
        -------
        The index of the node in firstDir
        """
        return list(self.dups.keys())[key]
    
    def getDupIdx(self, key: int):
        """
        Parameters
        Key - The index of the position of the node in firstDir
        Returns
        -------
        The indices of the duplicate nodes in secondDir
        """
        return self.dups[self.getIdx(key)]
    
    def getFirstDirPathFromIdx(self, idx: int):
        """
        Parameters
        idx - The index of the node in firstDir
        Returns
        -------
        The path to the node indexed by idx in firstDir
        """
        return self.firstDirPaths[idx]
    
    def getSecondDirPathsFromIdxs(self, idxs: list):
        """
        Parameters
        idxs - The indices of the node in secondDir
        Returns
        -------
        The path to the noded indexed by idxs in secondDir
        """        
        for idx in idxs:
            yield self.secondDirPaths[idx]
            
    def concatOriginalDupIdxs(self, key: int):
        """
        Parameters
        Key - The index of the position of the node in firstDir
        Returns
        -------
        A concatenation of a) the index of the node in firstDir and b) the indices of the duplicate nodes in secondDir
        """  
        firstDirIdx = self.getIdx(key)
        secondDirIdxs = [k for k in self.getDupIdx(key).keys()]
        return [firstDirIdx] + secondDirIdxs, [self.getFirstDirPathFromIdx(firstDirIdx)] + list(self.getSecondDirPathsFromIdxs(secondDirIdxs))
                                                                                              
    def getNameFromPath(self, paths: list):
        """
        Parameters
        paths - Paths to images
        Returns
        -------
        Names of the images
        """                       
        for path in paths:
            yield os.path.basename(path)
        
    def readImg(self, path: str):
        """
        Parameters
        path - Path to image
        Returns
        -------
        Numpy array of the image
        """ 
        img = mpimg.imread(path)
        if type(img) == np.ndarray:
            if img.ndim == 3:
                 #Remove the fourth colour channel, which is used for png images
                 return img[..., 0:3]
            if img.ndim == 2:
                 return img
        return        

    def getCurrFiles(self, key: int):
        """
        Parameters
        Key - The index of the position of the node in firstDir
        Returns
        -------
        """
        self.currIdxs, self.currPaths = self.concatOriginalDupIdxs(key)
        self.currNames = list(self.getNameFromPath(self.currPaths))
        self.currImgs = [self.readImg(path) for path in self.currPaths]
        self.currIdxsSelected = [False]*len(self.currIdxs)
        
    def plotImg(self, axis, name: str, img: np.ndarray, fs: int):
        """
        Parameters
        axis - Axis in which image is to be plotted
        name - title of image as it appears in the filename
        img - numpy array of the image to be plotted
        fs - first or second shown on the page (0/1)
        Returns
        -------
        """        
        self.img = self.currImgs[(self.j + fs) % len(self.currIdxs)]
        axis.set_title('Image {}: {}\nImage Size: {}'.format("1" if fs == 0 else "2", name, img.shape), fontsize=14)

        currIdxSelected = self.currIdxsSelected[(self.j + fs)  % len(self.currIdxs)]
        if not currIdxSelected:
            axis.imshow(cv2.copyMakeBorder(self.img, 20, 20, 20, 20, cv2.BORDER_CONSTANT, value=(255, 255, 255)))
        else:
            axis.imshow(cv2.copyMakeBorder(self.img, 20, 20, 20, 20, cv2.BORDER_CONSTANT, value=(255, 0, 0)))   
        self.fig.canvas.draw_idle()
        
    def plotImgs(self, firstIdx: int, secondIdx: int):  
        """
        Parameters - 
        firstIdx/secondIdx  - Indices of first/second image in currX variables 
        Returns
        -------
        """          
        _ = self.plotImg(self.axes[0],  self.currNames[firstIdx] , self.currImgs[firstIdx] , 0)
        _ = self.plotImg(self.axes[1],  self.currNames[secondIdx], self.currImgs[secondIdx], 1)
        
        self.fig.subplots_adjust(wspace=0.25, hspace=0.25)
        self.fig.suptitle('Choose which images you want to delete', fontsize=24, fontweight = "bold")
        plt.show()   

    def deleteImgs(self):
        """
        Parameters
        Key - The index of the position of the node in firstDir
        Returns
        -------
        """
        deletePaths = [path for path, flag in zip(self.currPaths, self.currIdxsSelected) if flag == True]
        
        for path in deletePaths:
            try:
                os.remove(path)
                print("Successfully deleted path %s" % (path))    
            except:
                print("Failed to remove path %s" % (path))       
                                                                                              
    def next(self, event):
        """
        Parameters - 
        event  - Automatically passed in the callback function (but not used) 
        Returns
        -------
        """          
        self.j += 2
        self.j = self.j % len(self.currIdxs)
        self.plotImgs(self.j, (self.j + 1) % len(self.currIdxs))
        
    def prev(self, event):
        """
        Parameters - 
        event  - Automatically passed in the callback function (but not used) 
        Returns
        -------
        """         
        self.j -= 2
        self.j = self.j % len(self.currIdxs)
        self.plotImgs(self.j, (self.j + 1) % len(self.currIdxs))        
        
    def deleteContinue(self, event):
        """
        Parameters - 
        event  - Automatically passed in the callback function (but not used) 
        Returns
        -------
        """ 
        self.deleteImgs()
        if self.i < self.numKeys - 1:
           self.i += 1
           self.getCurrFiles(self.i)
           self.j = 0
           self.plotImgs(self.j, (self.j + 1) % len(self.currIdxs))
        else:
           plt.close(self.fig)
           print("All duplicates marked as deleted have been removed")

### <ins>3. User Specifications</ins>

#### <ins>Option 1 - Single directory</ins>

Run this code if finding duplicates in a single directory

In [13]:
#Replace this with your chosen directory
path = r"path\to\directory"
threshold = 0.92

In [14]:
dups, firstDirPaths, secondDirPaths = getDuplicatesOneDirectory(path, threshold)
dups

Loading CLIP Model...


ftfy or spacy is not installed using BERT BasicTokenizer instead of ftfy.


HBox(children=(HTML(value='Batches'), FloatProgress(value=0.0, max=1.0), HTML(value='')))




{0: {1: 0.9784534573554993}}

In [15]:
if __name__ =="__main__":
    if len(dups) > 0:
        v= Visualise(dups, firstDirPaths, secondDirPaths, i = 0, j = 0)
        axprev = plt.axes([0.39, 0.08, 0.07, 0.03])
        axnext = plt.axes([0.55, 0.08, 0.07, 0.03])
        axdel  = plt.axes([0.47, 0.08, 0.07, 0.03])

        bnext = Button(axnext, 'Next')
        bnext.on_clicked(v.next)
        bprev = Button(axprev, 'Previous')
        bprev.on_clicked(v.prev)        
        bdel  = Button(axdel, "Delete & continue")
        bdel.on_clicked(v.deleteContinue)

        ax1, ax2 = SelectImage(v, v.fig, v.axes[0], 0), SelectImage(v, v.fig, v.axes[1], 1)

        plt.show()
    else:
       print("No duplicate images found") 

#### <ins>Option 2 - Two directories</ins>

Run this code if finding duplicates in two directories

In [67]:
#Replace these with your chosen directories
firstDir = r"path\to\directory\1"
secondDir = r"path\to\directory\2"
threshold = 0.98

In [None]:
dups, firstDirPaths, secondDirPaths = getDuplicatesTwoDirectories(firstDir, secondDir, threshold)
dups, firstDirPaths, secondDirPaths

In [None]:
if __name__ =="__main__":
    if len(dups) > 0:
        v= Visualise(dups, firstDirPaths, secondDirPaths, i = 0, j = 0)
        axprev = plt.axes([0.39, 0.08, 0.07, 0.03])
        axnext = plt.axes([0.55, 0.08, 0.07, 0.03])
        axdel  = plt.axes([0.47, 0.08, 0.07, 0.03])

        bnext = Button(axnext, 'Next')
        bnext.on_clicked(v.next)
        bprev = Button(axprev, 'Previous')
        bprev.on_clicked(v.prev)        
        bdel  = Button(axdel, "Delete & continue")
        bdel.on_clicked(v.deleteContinue)

        ax1, ax2 = SelectImage(v, v.fig, v.axes[0], 0), SelectImage(v, v.fig, v.axes[1], 1)

        plt.show()
    else:
       print("No duplicate images found") 