In [1]:
from pydantic import BaseModel
from enum import Enum
from typing import Literal
import pandas as pd
import os
import math

DELETE_FILES = [".DS_Store"]
IGNORE_FOLDERS = [".git"]

class DuplicateGroup(BaseModel):
    file_md5: str 
    file_size_h: str
    file_count: int
    common_ancestor: str
    folder_file: list[tuple[str, str]]


class ResolutionCategory(Enum):
    same_folder_diff_name = "same_folder_diff_name"
    diff_folder_same_name = "diff_folder_same_name"
    
    
class Resolution(BaseModel):
    category: ResolutionCategory
    files: list[tuple[str, bool]]
    
    

In [2]:

def convert_size(size_bytes):
    """Convert bytes to a readable format (KB, MB, GB, etc.)."""
    if size_bytes == 0:
        return "0B"
    size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
    i = int(math.floor(math.log(size_bytes, 1024)))
    p = math.pow(1024, i)
    s = round(size_bytes / p, 2)
    return f"{s} {size_name[i]}"

def get_duplicate_volume(df):
    return convert_size(df[df.duplicated(subset='file_md5', keep='first')].file_size.sum())

In [3]:
df = pd.read_csv("../files_report.csv")
df = df[~df.isna().any(axis=1) & (df.file_size > 0)]


duplicates = df[df.duplicated(subset='file_md5', keep=False)].sort_values(by='file_size', ascending=False)
duplicates['file_size_h'] = duplicates['file_size'].apply(convert_size)
duplicates['folder'] = duplicates['file_path'].apply(lambda x: os.path.dirname(x))
dupl_records = duplicates.groupby(['file_md5', 'file_size_h'], group_keys=False).apply(lambda x: x[['file_path', 'folder']].to_dict(orient='records')).to_dict()

  dupl_records = duplicates.groupby(['file_md5', 'file_size_h'], group_keys=False).apply(lambda x: x[['file_path', 'folder']].to_dict(orient='records')).to_dict()


In [4]:
paths = list(duplicates.file_path.values)
folder_paths = [os.path.dirname(p) for p in paths]

In [5]:
# from graphviz import Digraph

# class TrieNode:
#     def __init__(self, key=""):
#         self.key = key
#         self.children = {}
#         self.is_end_of_path = False

# class Trie:
#     def __init__(self):
#         self.root = TrieNode()

#     def insert(self, path):
#         current = self.root
#         for part in path.split('/'):
#             if part not in current.children:
#                 current.children[part] = TrieNode(part)
#             current = current.children[part]
#         current.is_end_of_path = True

#     def compress(self):
#         def compress_node(node):
#             keys_to_compress = list(node.children.keys())
#             for key in keys_to_compress:
#                 child = node.children[key]
#                 while len(child.children) == 1 and not child.is_end_of_path:
#                     grandchild_key = next(iter(child.children))
#                     grandchild = child.children[grandchild_key]
#                     child.key += '/' + grandchild_key
#                     child.children = grandchild.children
#                     child.is_end_of_path = grandchild.is_end_of_path
#                 compress_node(child)
        
#         compress_node(self.root)

#     def display(self):
#         dot = Digraph()
#         dot.node('root', 'root', style='filled', fillcolor='lightgray')
        
#         def add_edges(node, parent_key):
#             for key, child in node.children.items():
#                 if '.' in key:  # Assuming base file names contain a dot (e.g., 'xyz.png')
#                     dot.node(child.key, child.key, style='filled', fillcolor='lightgray')
#                 else:
#                     dot.node(child.key, child.key)
#                 dot.edge(parent_key, child.key)
#                 add_edges(child, child.key)
        
#         add_edges(self.root, 'root')
#         return dot


# # paths = []


# trie = Trie()
# for path in paths:
#     trie.insert(path)

# print("Before compression:")
# dot = trie.display()
# dot.render('trie_before_compression', format='svg', cleanup=True)

# trie.compress()

# print("\nAfter compression:")
# dot = trie.display()
# dot.render('trie_after_compression', format='svg', cleanup=True)



In [6]:
import os
def find_common_prefix(paths):
    """Find the largest common prefix (base folder) in a list of paths."""
    if not paths:
        return ""
    
    # Split the paths into lists of directories
    split_paths = [p.split(os.sep) for p in paths]
    
    # Use zip to transpose the list of lists and iterate over corresponding parts
    common_parts = []
    for parts in zip(*split_paths):
        if all(part == parts[0] for part in parts):
            common_parts.append(parts[0])
        else:
            break
    
    # Join the common parts back into a single path
    return os.sep.join(common_parts)

In [7]:
## get all the duplicate groups
_dups = []
for ix, ((md5, size_h), records) in enumerate(dupl_records.items()):
    paths = [r['file_path'] for r in records]

    common_ancestor = find_common_prefix(paths)
    folder_and_file = [(r['folder'][len(common_ancestor):].lstrip(os.sep), os.path.basename(r['file_path']) )  for r in records]
    
    # how often these folders appear together
    _dups.append(DuplicateGroup(file_md5=md5, file_size_h=size_h, file_count=len(paths), common_ancestor=common_ancestor, folder_file=folder_and_file))
    

In [8]:
_dups[0]

DuplicateGroup(file_md5='00a989d9a58de3a7e294631061418125', file_size_h='1.43 MB', file_count=2, common_ancestor='/Users/arunpatro/My Drive/IIT - KGP/Acads/Semester 5/Power Electronics Lab', folder_file=[('NED mohan_PE/lab record', 'DSC02861.JPG'), ('Lab Record', 'DSC02861.JPG')])

In [9]:
def generate_deletion_list(dups: list[DuplicateGroup]) -> list[str]:
    """Generate a list of deletions for each duplicate group."""
    x = [dup.files for dup in _dd]
    x = list(itertools.chain(*x))
    df = pd.DataFrame(x, columns=['file_path', 'keep'])
    df['delete'] = ~df['keep']
    return df[df['delete']]['file_path'].tolist()

In [13]:
from collections import Counter
import itertools

# Initialize a Counter to store pairwise occurrences
pairwise_counts = Counter()

dup_folders = [(d.common_ancestor, d.folder_file) for d in _dups if any(f != "" for f, _ in d.folder_file)]
dup_folders = [(x, *[i[0] for i in y]) for x, y in dup_folders]

for acs, *folders in dup_folders:
    sorted_folders = sorted(folders)
    unique_pairs = [(acs, *sorted(pair)) for pair in itertools.combinations(sorted_folders, 2) if pair[0] != pair[1]]
    pairwise_counts.update(unique_pairs)

# Printing the pairwise occurrences
vals = []
for pair, count in pairwise_counts.most_common():
    vals.append((*pair, count))
    
pd.DataFrame(vals, columns=['common', 'folder1', 'folder2', 'count'])


Unnamed: 0,common,folder1,folder2,count
0,/Users/arunpatro/My Drive/IIT - KGP/Acads/Seme...,Lab Record,NED mohan_PE/lab record,46
1,/Users/arunpatro/My Drive/IIT - KGP/Acads/Seme...,Power EC/Power EC NPTEL,Power Electronics Lab/Power EC NPTEL,44
2,/Users/arunpatro/My Drive,Google Photos/2017,IIT - KGP/Acads/Instru Lab Expts/expt_6_photos,39
3,/Users/arunpatro/My Drive/IIT - KGP/Acads/BTP/...,backs,shankar-2,38
4,/Users/arunpatro/My Drive,Google Photos/2015,IIT - KGP/Acads/Semester 5/Quantum/3D + Quantum,22
...,...,...,...,...
107,/Users/arunpatro/My Drive/IIT - KGP/Acads/BTP/...,,refs/heads,1
108,/Users/arunpatro/My Drive,ARCHIVE/SOP Samples from Raunak,Graduate School Applications,1
109,/Users/arunpatro/My Drive,Books/Machine Learning,Myntra/pdfs,1
110,/Users/arunpatro/My Drive/IIT - KGP/Acads/MTP/...,figures,imgs,1


In [14]:
import hashlib
import os
import json
from collections import Counter

# Function to calculate MD5 hash of a file
def calculate_md5(file_path):
    hash_md5 = hashlib.md5()
    try:
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
    except FileNotFoundError:
        return None
    return hash_md5.hexdigest()

# Function to recursively find all files and their MD5s in a directory
def find_all_files_with_md5(directory):
    file_md5s = set()
    for root, _, filenames in os.walk(directory):
        for filename in filenames:
            file_path = os.path.join(root, filename)
            file_md5 = calculate_md5(file_path)
            if file_md5:
                file_md5s.add(file_md5)
    return file_md5s

# Function to ask user for their preference
def get_user_preference(pair, count):
    base, folder1, folder2 = pair
    files1 = find_all_files_with_md5(os.path.join(base, folder1))
    files2 = find_all_files_with_md5(os.path.join(base, folder2))
    f1_total = len(files1)
    f2_total = len(files2)
    common_files = files1 & files2
    # assert len(common_files) == count
    # only_in_f1 = files1 - files2
    # only_in_f2 = files2 - files1
    
    clash_perc1 = len(common_files) / f1_total if f1_total > 0 else 0
    clash_perc2 = len(common_files) / f2_total if f2_total > 0 else 0
    
    choices = [(folder1, clash_perc1), (folder2, clash_perc2)]
    choices = sorted(choices, key=lambda x: x[1], reverse=True)
    chosen_folder = choices[0][0]
    
    output = f"""
Common: {base}
Clashes: {count}
Choosing "{choices[0][0]}" over "{choices[1][0]}"
--
F1: {folder1}
\tClashes: ({clash_perc1:.1%}) - {len(common_files)} / {f1_total}

F2: {folder2}
\tClashes: ({clash_perc2:.1%}) - {len(common_files)} / {f2_total}
"""
    print(output)
    
    return chosen_folder

# Iterate over the pairs and get user preferences
user_preferences = {}
for pair, count in pairwise_counts.items():
    preferred_folder = get_user_preference(pair, count)
    if preferred_folder:
        user_preferences[pair] = preferred_folder

# Save user preferences to a JSON file
with open('user_preferences.json', 'w') as f:
    json.dump(user_preferences, f, indent=4)

print("User preferences saved to user_preferences.json")



Common: /Users/arunpatro/My Drive/IIT - KGP/Acads/Semester 5/Power Electronics Lab
Clashes: 46
Choosing "NED mohan_PE/lab record" over "Lab Record"
--
F1: Lab Record
	Clashes: (31.5%) - 46 / 146

F2: NED mohan_PE/lab record
	Clashes: (37.1%) - 46 / 124



In [None]:
def resolve_same_folder(dupl_group) -> Resolution:
    """Resolve duplicates in the same folder."""
    # Check if the files are the same
    folder_files = dupl_group.folder_file
    file_names = [f for _, f in folder_files]
    files = sorted(file_names, key=lambda x: (len(x), x.lower()), reverse=True)
    files = [os.path.join(dupl_group.common_ancestor, f) for f in files]
    files = [(f, True) if f == files[-1] else (f, False) for f in files] # keep the last file
    return Resolution(files=files, category=ResolutionCategory.same_folder_diff_name)

def resolve_same_name(dupl_group) -> Resolution:
    """Resolve duplicates with the same name."""
    # Check if the files are the same
    folder_files = dupl_group.folder_file
    base_file = folder_files[0][1]
    folders = [f for f, _ in folder_files]
    folders = sorted(folders, key=lambda x: (len(x), x.lower()), reverse=True)
    folders = [os.path.join(dupl_group.common_ancestor, f) for f in folders]
    files = [os.path.join(f, base_file) for f in folders]
    files = [(f, True) if f == files[-1] else (f, False) for f in files] # keep the last file
    return None
    return Resolution(files=files, category=ResolutionCategory.diff_folder_same_name)

def resolve(group: DuplicateGroup) -> Resolution:
    if all([f == "" for f, _ in group.folder_file]):
        # all files are in the same folder
        return resolve_same_folder(group)
    elif len(set([f for _, f in group.folder_file])) == 1:
        # all files have the same name
        return resolve_same_name(group)
    
    return None
    

In [None]:
_dd = [resolve(d) for d in _dups if resolve(d) is not None]
xx = pd.DataFrame([dup.files for dup in _dd])


In [None]:

df = pd.concat()
df.columns = ['file_path', 'keep']
df['delete'] = ~df['keep']
return df[df['delete']]['file_path'].tolist()

In [None]:
_dupl2 = [d for d in _dups if not resolve(d)]

In [None]:
_dupl2

In [None]:
same_folder_dupls = [pd.DataFrame(resolve(d).files) for d in _dups if resolve(d) is not None]

In [None]:
df = pd.concat(same_folder_dupls)
df.columns = ['file_path', 'keep']
for x in (df[~df['keep']].file_path.values):
    print(x)

In [None]:
def prioritize_folders(folder_paths):
    """Return the arg idxs - Prioritize Shorter and more discriptive folder first."""
    return sorted(range(len(folder_paths)), key=lambda x: (len(folder_paths[x]), folder_paths[x]))

def duplicates_in_same_folder(records):
    """Find duplicates in the same folder."""

    if len(records) > 1:
        common_prefix = find_common_prefix([r['folder'] for r in records])
        folder_remainders = [r['folder'][len(common_prefix):].lstrip(os.sep) for r in records]
        file_names = [os.path.basename(r['file_path']) for r in records]
        if common_prefix and all([f == "" for f in folder_remainders]):
            # sorted by max length first and then reverse alphabetically
            

            df = pd.DataFrame(items)
            df.columns = ['File Name']
            df['Keep'] = False
            df.loc[df.index[-1], 'Keep'] = True
            return df
        
    return pd.DataFrame()

from pydantic import BaseModel
from typing import Literal

class Resolution(BaseModel):
    """Resolution for duplicates."""
    resolution: pd.DataFrame
    category: Literal['same_folder', 'different_folders', 'None']
    
    def __bool__(self):
        return not self.resolution.empty
    
    
def resolve_duplicates(records) -> Resolution:
    
    if len(records) > 1:
        """Resolve duplicates in the same folder."""
        common_prefix = find_common_prefix([r['folder'] for r in records])
        folder_remainders = [r['folder'][len(common_prefix):].lstrip(os.sep) for r in records]
        file_names = [os.path.basename(r['file_path']) for r in records]
        if common_prefix and all([f == "" for f in folder_remainders]):
            # sorted by max length first and then reverse alphabetically
            items = sorted(file_names, key=lambda x: (len(x), x.lower()), reverse=True)

            df = pd.DataFrame(items)
            df.columns = ['File Name']
            df['Keep'] = False
            df.loc[df.index[-1], 'Keep'] = True
            return Resolution(resolution=df, category='same_folder')
    
    
        
    return Resolution(resolution=pd.DataFrame(), category='None')
        
def duplicates_in_different_folders(records):
    
    if len(records) > 2:
        ## files could be spread across multiple folders and in each folder there could be multiple duplicates
        ## group by folder and then remove duplicates in each folder
        df = pd.DataFrame(records)
        df = df.groupby('folder').apply(resolve_duplicates)
        
        
        
    return pd.DataFrame()
        
        

In [None]:
_dups

In [None]:
# Iterate over the groups and print the paths with highlighted common base folder
for ix, ((md5, size_h), records) in enumerate(dupl_records.items()):
    md5_short = md5[:6]

    
    # _resolution = resolve_duplicates(records)
    
    # Print MD5 hash, size, and common prefix
    print(f"{ix+1}.")
    print(f"MD5: {md5_short} Size: {size_h}")
    print(f"Common: {nearest_ancestor}")

    # _action_df = duplicates_in_same_folder(v)
    # if not _action_df.empty:
    #     print(f"""--
    # Reason: Same folder duplicate
    # Details:
    # - Common Folder: {common_prefix}
    # - N Duplicate Files: {len(_action_df)-1}
    # {_action_df}
    # --""")
    for i, (folder, filepath) in enumerate(folder_and_file):
        # Print the details
        print(f"  {i+1}. Folder: {folder}")
        print(f"     File: {filepath}")
    print("--")



In [None]:
agg = duplicates.groupby('file_md5').agg({'folder': list}).reset_index()

# grouped_duplicates = duplicates.sort_values(by='file_size', ascending=False).groupby('file_md5')

# # Aggregate base folders into a list
# aggregated_duplicates = grouped_duplicates.agg({
#     'file_path': list,
#     'file_size': 'sum',
#     'base_folder': lambda x: list(set(x))
# })

In [None]:
duplicates.to_csv("duplicates.csv", index=False)

In [None]:
grouped_duplicates = duplicates.sort_values(by='file_size', ascending=False).groupby('file_md5')

# Iterate over groups and display file paths and sizes
for md5, group in grouped_duplicates:
    print(f"\nMD5 Hash: {md5}")
    print(f"Total Size: {convert_size(group['file_size'].sum())}")
    print("File Paths:")
    for idx, row in group.iterrows():
        print(f"  {row['file_path']} - {row['file_size_h']}")