Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 62 additions & 41 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,34 @@
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import sys
from tqdm import tqdm

MIN_FILE_SIZE = 100 * 1024 # 100 KB
DEFAULT_TOP_N = 10

# Use Path objects for cross-platform compatibility
SKIP_DIRS = {
# Linux
"/dev",
"/proc",
"/sys",
"/run",
"/var/run",
"/snap",
"/boot",
"/lost+found",
Path("/dev"),
Path("/proc"),
Path("/sys"),
Path("/run"),
Path("/var/run"),
Path("/snap"),
Path("/boot"),
Path("/lost+found"),
# macOS
"/System",
"/Library",
"/private/var",
"/.Spotlight-V100",
"/.DocumentRevisions-V100",
"/.fseventsd",
Path("/System"),
Path("/Library"),
Path("/private/var"),
Path("/.Spotlight-V100"),
Path("/.DocumentRevisions-V100"),
Path("/.fseventsd"),
}
DEEPEST_SKIP_DIRS_LEVEL = max(dirpath.count("/") for dirpath in SKIP_DIRS)
# Calculate deepest level for optimization
DEEPEST_SKIP_DIRS_LEVEL = max(len(p.parts) for p in SKIP_DIRS)

CATEGORIES = {
"Pictures": {
".jpg",
Expand Down Expand Up @@ -92,8 +96,8 @@ def get_disk_usage(path):
return total, used, free


def categorize_file(filepath: str) -> str:
suffix = "." + filepath.split(".")[-1].lower()
def categorize_file(filepath: Path) -> str:
suffix = filepath.suffix.lower()

for category, extensions in CATEGORIES.items():
if suffix in extensions:
Expand All @@ -102,19 +106,20 @@ def categorize_file(filepath: str) -> str:
return "Others"


def should_skip_directory(dirpath: str) -> bool:
def should_skip_directory(dirpath: Path) -> bool:
"""Check if directory should be skipped (system directories)."""
return dirpath in SKIP_DIRS


def identify_special_dir(dirpath: str) -> Optional[str]:
def identify_special_dir(dirpath: Path) -> Optional[str]:
"""
Check if directory is a special type that should be treated as atomic unit.
Returns category name if special, None otherwise.
"""
dir_name = dirpath.split("/")[-1].lower()
dir_name = dirpath.name.lower()

# Check for macOS .app bundles
if dirpath.endswith(".app"):
if dirpath.suffix == ".app":
return "macOS Apps"

# Check special directory names
Expand All @@ -125,7 +130,7 @@ def identify_special_dir(dirpath: str) -> Optional[str]:
return None


def calculate_dir_size(dirpath: str) -> int:
def calculate_dir_size(dirpath: Path) -> int:
"""
Calculate total size of directory using os.scandir (efficient and portable).
"""
Expand All @@ -139,7 +144,7 @@ def calculate_dir_size(dirpath: str) -> int:
stat.st_blocks * 512 if hasattr(stat, "st_blocks") else stat.st_size
)
elif entry.is_dir(follow_symlinks=False):
total_size += calculate_dir_size(entry.path)
total_size += calculate_dir_size(Path(entry.path))

except (FileNotFoundError, PermissionError, OSError):
continue
Expand All @@ -151,7 +156,7 @@ def calculate_dir_size(dirpath: str) -> int:

def scan_files_and_dirs(
path: Path, used: int, min_size: int = MIN_FILE_SIZE
) -> Tuple[Dict[str, List[Tuple[int, str]]], Dict[str, List[Tuple[int, str]]], int, int]:
) -> Tuple[Dict[str, List[Tuple[int, Path]]], Dict[str, List[Tuple[int, Path]]], int, int]:
"""
Scan directory tree for files and special directories.
Returns: (file_categories, dir_categories, total_files, total_size)
Expand All @@ -162,22 +167,27 @@ def scan_files_and_dirs(
scanned_size = 0
progress_update_buffer = 0

starting_level = len(path.parts)
is_skip_dirs = True if starting_level < DEEPEST_SKIP_DIRS_LEVEL else False

with tqdm(total=used, unit="B", unit_scale=True, desc="Scanning") as pbar:
for level, (root, dirs, files) in enumerate(os.walk(path, topdown=True)):
if level == 0:
root = root.rstrip("/")
starting_level = root.count("/")
for root, dirs, files in os.walk(path, topdown=True):
root_path = Path(root)

# Check subdirectories and handle special ones BEFORE descending
dirs_to_remove = []
for dirname in dirs:
dir_path = f"{root}/{dirname}"

# Skip system directories
if starting_level < DEEPEST_SKIP_DIRS_LEVEL:
if should_skip_directory(dir_path):
dirs_to_remove.append(dirname)
continue
dir_path = root_path / dirname

# Skip system directories (only check if we're not too deep)
if (
is_skip_dirs
and len(dir_path.parts) <= DEEPEST_SKIP_DIRS_LEVEL
and should_skip_directory(dir_path)
):
print(f"Skipping system directory: {dir_path}")
dirs_to_remove.append(dirname)
continue

# Check if this subdirectory is special
special_type = identify_special_dir(dir_path)
Expand All @@ -197,13 +207,13 @@ def scan_files_and_dirs(

# Process files in current directory
for name in files:
filepath = f"{root}/{name}"
filepath = root_path / name
try:
stat = os.stat(filepath)
size = stat.st_blocks * 512 if hasattr(stat, "st_blocks") else stat.st_size

if size >= min_size:
category = categorize_file(name)
category = categorize_file(filepath)
file_categories[category].append((size, filepath))

scanned_files += 1
Expand Down Expand Up @@ -319,6 +329,10 @@ def main():
print(f"❌ Error: Path '{scan_path}' does not exist")
return

if not scan_path.is_dir():
print(f"❌ Error: Path '{scan_path}' is not a directory")
sys.exit(1)

# Display disk usage
total, used, free = map(float, get_disk_usage(str(scan_path)))
terminal_width = shutil.get_terminal_size().columns
Expand All @@ -333,9 +347,16 @@ def main():
print()

# Scan files and directories
file_cats, dir_cats, total_files, total_size = scan_files_and_dirs(
scan_path, used, args.min_size * 1024
)
try:
file_cats, dir_cats, total_files, total_size = scan_files_and_dirs(
scan_path, used, args.min_size * 1024
)
except KeyboardInterrupt:
print("\n⚠️ Scan interrupted by user")
sys.exit(1)
except Exception as e:
print(f"❌ Error during scan: {e}")
sys.exit(1)

# Get top N for each category
top_files = get_top_n_per_category(file_cats, top_n=args.top)
Expand Down
Loading
Loading