In [None]:
import os
from enum import Enum
from typing import List, Dict, Union, Tuple, Optional
from pydantic import BaseModel


class FileType(str, Enum):
    IMAGE = "Image"
    VIDEO = "Video"
    TEXT = "Text"
    AUDIO = "Audio"
    ARCHIVE = "Archive"
    DOCUMENT = "Document"
    OTHER = "Other"


class FileMetadata(BaseModel):
    size: int
    created_at: Optional[str] = None
    modified_at: Optional[str] = None


class ImageMetadata(FileMetadata):
    resolution: Optional[Tuple[int, int]] = None
    color_mode: Optional[str] = None
    format: Optional[str] = None


class VideoMetadata(FileMetadata):
    duration: Optional[float] = None
    resolution: Optional[Tuple[int, int]] = None
    framerate: Optional[float] = None
    codec: Optional[str] = None
    bitrate: Optional[int] = None


class TextMetadata(FileMetadata):
    num_words: Optional[int] = None
    language: Optional[str] = None
    encoding: Optional[str] = None


class AudioMetadata(FileMetadata):
    bitrate: Optional[int] = None
    duration: Optional[float] = None
    sample_rate: Optional[int] = None
    channels: Optional[int] = None
    codec: Optional[str] = None


class ArchiveMetadata(FileMetadata):
    num_files: Optional[int] = None
    compression_type: Optional[str] = None
    encrypted: Optional[bool] = None


class DocumentMetadata(FileMetadata):
    num_pages: Optional[int] = None
    author: Optional[str] = None
    title: Optional[str] = None
    language: Optional[str] = None


class DirectoryFileOrganizer:
    IMAGE_EXTENSIONS = {"png", "jpg", "jpeg", "bmp", "gif"}
    VIDEO_EXTENSIONS = {"mp4", "avi", "mov", "mkv"}
    TEXT_EXTENSIONS = {"txt", "md", "doc", "docx", "pdf"}
    AUDIO_EXTENSIONS = {"mp3", "wav", "aac", "flac"}
    ARCHIVE_EXTENSIONS = {"zip", "rar", "tar", "gz"}
    DOCUMENT_EXTENSIONS = {"pdf", "doc", "docx", "ppt", "pptx"}

    @staticmethod
    def get_file_type(file_extension: str) -> FileType:
        file_extension = file_extension.lower().replace(".", "")
        if file_extension in DirectoryFileOrganizer.IMAGE_EXTENSIONS:
            return FileType.IMAGE
        elif file_extension in DirectoryFileOrganizer.VIDEO_EXTENSIONS:
            return FileType.VIDEO
        elif file_extension in DirectoryFileOrganizer.TEXT_EXTENSIONS:
            return FileType.TEXT
        elif file_extension in DirectoryFileOrganizer.AUDIO_EXTENSIONS:
            return FileType.AUDIO
        elif file_extension in DirectoryFileOrganizer.ARCHIVE_EXTENSIONS:
            return FileType.ARCHIVE
        elif file_extension in DirectoryFileOrganizer.DOCUMENT_EXTENSIONS:
            return FileType.DOCUMENT
        else:
            return FileType.OTHER

    @staticmethod
    def list_files_recursive(directory_path: str) -> List[str]:
        file_paths = []
        for root, _, files in os.walk(directory_path):
            for file in files:
                file_path = os.path.join(root, file)
                file_paths.append(file_path)
        return file_paths

    @staticmethod
    def extract_metadata(file_paths: List[str]) -> List[
        Tuple[
            str,
            Union[
                FileMetadata,
                ImageMetadata,
                VideoMetadata,
                TextMetadata,
                AudioMetadata,
                ArchiveMetadata,
                DocumentMetadata,
            ],
        ]
    ]:
        file_metadata_list = []
        for file_path in file_paths:
            file_extension = file_path.split(".")[-1].lower()
            file_type = DirectoryFileOrganizer.get_file_type(file_extension)
            file_size = os.path.getsize(file_path)

            if file_type == FileType.IMAGE:
                resolution = (
                    1920,
                    1080,
                )  # Placeholder for actual resolution extraction
                file_metadata_list.append(
                    (file_path, ImageMetadata(size=file_size, resolution=resolution))
                )
            elif file_type == FileType.VIDEO:
                duration = 120.0  # Placeholder for actual duration extraction
                file_metadata_list.append(
                    (file_path, VideoMetadata(size=file_size, duration=duration))
                )
            elif file_type == FileType.TEXT:
                num_words = 1000  # Placeholder for actual word count extraction
                file_metadata_list.append(
                    (file_path, TextMetadata(size=file_size, num_words=num_words))
                )
            elif file_type == FileType.AUDIO:
                bitrate = 320  # Placeholder for actual bitrate extraction
                file_metadata_list.append(
                    (file_path, AudioMetadata(size=file_size, bitrate=bitrate))
                )
            elif file_type == FileType.ARCHIVE:
                num_files = 10  # Placeholder for actual number of files extraction
                file_metadata_list.append(
                    (file_path, ArchiveMetadata(size=file_size, num_files=num_files))
                )
            elif file_type == FileType.DOCUMENT:
                num_pages = 50  # Placeholder for actual number of pages extraction
                file_metadata_list.append(
                    (file_path, DocumentMetadata(size=file_size, num_pages=num_pages))
                )
            else:
                file_metadata_list.append((file_path, FileMetadata(size=file_size)))
        return file_metadata_list

    @staticmethod
    def group_files_by_type(
        files: Union[
            List[
                Tuple[
                    str,
                    Union[
                        FileMetadata,
                        ImageMetadata,
                        VideoMetadata,
                        TextMetadata,
                        AudioMetadata,
                        ArchiveMetadata,
                        DocumentMetadata,
                    ],
                ]
            ],
            List[str],
        ]
    ) -> Union[
        Dict[
            FileType,
            List[
                Tuple[
                    str,
                    Union[
                        FileMetadata,
                        ImageMetadata,
                        VideoMetadata,
                        TextMetadata,
                        AudioMetadata,
                        ArchiveMetadata,
                        DocumentMetadata,
                    ],
                ]
            ],
        ],
        Dict[FileType, List[str]],
    ]:
        grouped_files = {
            FileType.IMAGE: [],
            FileType.VIDEO: [],
            FileType.TEXT: [],
            FileType.AUDIO: [],
            FileType.ARCHIVE: [],
            FileType.DOCUMENT: [],
            FileType.OTHER: [],
        }

        if all(isinstance(file, str) for file in files):
            for file_path in files:
                file_type = DirectoryFileOrganizer.get_file_type(
                    file_path.split(".")[-1].lower()
                )
                grouped_files[file_type].append(file_path)
        else:
            for file_path, file_metadata in files:
                file_type = DirectoryFileOrganizer.get_file_type(
                    file_path.split(".")[-1].lower()
                )
                grouped_files[file_type].append((file_path, file_metadata))

        return grouped_files


# Example usage

directory_path = "../../files/iPhone 13/"
paths = DirectoryFileOrganizer.list_files_recursive(directory_path)
files = DirectoryFileOrganizer.extract_metadata(paths)
grouped_files = DirectoryFileOrganizer.group_files_by_type(files)
grouped_files
# for file_type, files in grouped_files.items():
#     print(f"{file_type.value} files:")
#     for file_path, file_metadata in files:
#         print(f"  {file_path}: {file_metadata}")

# Metadata Extractors


In [None]:
from abc import ABC, abstractmethod

from PIL import Image
import ffmpeg
import imagehash
import os
import mimetypes
import magic

import chardet
from langdetect import detect


class MetadataExtractor(ABC):
    @abstractmethod
    def extract_metadata(self, file_path: str) -> FileMetadata:
        pass


class ImageMetadataExtractor(MetadataExtractor):
    def extract_metadata(self, file_path: str) -> ImageMetadata:
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")

        try:
            image = Image.open(file_path)
            resolution = image.size
            color_mode = image.mode
            format = image.format
        except Exception as e:
            raise ValueError(f"Error opening image file: {e}")

        try:
            size = os.path.getsize(file_path)
        except OSError as e:
            raise ValueError(f"Error getting file size: {e}")

        try:
            created_at = os.path.getctime(file_path)
        except OSError as e:
            raise ValueError(f"Error getting file creation time: {e}")

        try:
            modified_at = os.path.getmtime(file_path)
        except OSError as e:
            raise ValueError(f"Error getting file modification time: {e}")

        return ImageMetadata(
            size=size,
            created_at=created_at,
            modified_at=modified_at,
            resolution=resolution,
            color_mode=color_mode,
            format=format,
        )


class VideoMetadataExtractor(MetadataExtractor):
    def extract_metadata(self, file_path: str) -> VideoMetadata:
        try:
            if not os.path.exists(file_path):
                raise FileNotFoundError(f"File not found: {file_path}")

            size = os.path.getsize(file_path)
            created_at = os.path.getctime(file_path)
            modified_at = os.path.getmtime(file_path)

            try:
                probe = ffmpeg.probe(file_path)
            except ffmpeg.Error as e:
                raise ValueError(f"Error probing video file: {e.stderr.decode()}")

            video_stream = next(
                (
                    stream
                    for stream in probe["streams"]
                    if stream["codec_type"] == "video"
                ),
                None,
            )

            if video_stream is None:
                raise ValueError("No video stream found in file")

            try:
                duration = float(probe["format"]["duration"])
            except (KeyError, ValueError, TypeError):
                duration = None

            try:
                resolution = (int(video_stream["width"]), int(video_stream["height"]))
            except (KeyError, ValueError, TypeError):
                resolution = (None, None)

            try:
                framerate = eval(video_stream["r_frame_rate"])
            except (KeyError, ValueError, TypeError, SyntaxError):
                framerate = None

            try:
                codec = video_stream["codec_name"]
            except KeyError:
                codec = None

            try:
                bitrate = int(probe["format"]["bit_rate"])
            except (KeyError, ValueError, TypeError):
                bitrate = None

            return VideoMetadata(
                size=size,
                created_at=created_at,
                modified_at=modified_at,
                duration=duration,
                resolution=resolution,
                framerate=framerate,
                codec=codec,
                bitrate=bitrate,
            )
        except Exception as e:
            raise RuntimeError(f"Failed to extract video metadata: {str(e)}")


class TextMetadataExtractor(MetadataExtractor):
    def extract_metadata(self, file_path: str) -> TextMetadata:
        size = os.path.getsize(file_path)
        created_at = os.path.getctime(file_path)
        modified_at = os.path.getmtime(file_path)

        with open(file_path, "rb") as file:
            raw_data = file.read()
            encoding_info = chardet.detect(raw_data)
            encoding = encoding_info["encoding"]
            text = raw_data.decode(encoding)

        num_words = len(text.split())
        language = self.detect_language(text)

        return TextMetadata(
            size=size,
            created_at=created_at,
            modified_at=modified_at,
            num_words=num_words,
            language=language,
            encoding=encoding,
        )

    def detect_language(self, text: str) -> str:
        try:
            language = detect(text)
        except Exception as e:
            raise RuntimeError(f"Failed to detect language: {str(e)}")
        return language


import os
import ffmpeg
from typing import Optional


class AudioMetadataExtractor(MetadataExtractor):
    def extract_metadata(self, file_path: str) -> AudioMetadata:
        try:
            if not os.path.exists(file_path):
                raise FileNotFoundError(f"File not found: {file_path}")

            size = os.path.getsize(file_path)
            created_at = os.path.getctime(file_path)
            modified_at = os.path.getmtime(file_path)

            try:
                probe = ffmpeg.probe(file_path)
            except ffmpeg.Error as e:
                raise ValueError(f"Error probing audio file: {e.stderr.decode()}")

            format_info = probe.get("format", {})
            streams_info = next(
                (
                    stream
                    for stream in probe.get("streams", [])
                    if stream.get("codec_type") == "audio"
                ),
                None,
            )

            if streams_info is None:
                raise ValueError("No audio stream found in file")

            try:
                bitrate = int(format_info.get("bit_rate", 0))
            except (KeyError, ValueError, TypeError):
                bitrate = None

            try:
                duration = float(format_info.get("duration", 0.0))
            except (KeyError, ValueError, TypeError):
                duration = None

            try:
                sample_rate = int(streams_info.get("sample_rate", 0))
            except (KeyError, ValueError, TypeError):
                sample_rate = None

            try:
                channels = int(streams_info.get("channels", 0))
            except (KeyError, ValueError, TypeError):
                channels = None

            try:
                codec = streams_info.get("codec_name", "unknown")
            except KeyError:
                codec = None

            return AudioMetadata(
                size=size,
                created_at=created_at,
                modified_at=modified_at,
                bitrate=bitrate,
                duration=duration,
                sample_rate=sample_rate,
                channels=channels,
                codec=codec,
            )
        except Exception as e:
            raise RuntimeError(f"Failed to extract audio metadata: {str(e)}")


import os
import zipfile
import tarfile
from typing import Union


class ArchiveMetadataExtractor(MetadataExtractor):
    def extract_metadata(self, file_path: str) -> ArchiveMetadata:
        try:
            size = os.path.getsize(file_path)
            created_at = os.path.getctime(file_path)
            modified_at = os.path.getmtime(file_path)
        except (OSError, ValueError) as e:
            raise RuntimeError(f"Failed to retrieve file system metadata: {str(e)}")

        if zipfile.is_zipfile(file_path):
            try:
                with zipfile.ZipFile(file_path, "r") as archive:
                    num_files = len(archive.namelist())
                    compression_type = "zip"
                    encrypted = any(info.flag_bits & 0x1 for info in archive.infolist())
            except (zipfile.BadZipFile, RuntimeError) as e:
                raise RuntimeError(f"Failed to extract ZIP archive metadata: {str(e)}")
        elif tarfile.is_tarfile(file_path):
            try:
                with tarfile.open(file_path, "r") as archive:
                    num_files = len(archive.getnames())
                    compression_type = "tar"
                    encrypted = False  # tar files do not support encryption by default
            except (tarfile.TarError, RuntimeError) as e:
                raise RuntimeError(f"Failed to extract TAR archive metadata: {str(e)}")
        else:
            raise ValueError("Unsupported archive format")

        return ArchiveMetadata(
            size=size,
            created_at=created_at,
            modified_at=modified_at,
            num_files=num_files,
            compression_type=compression_type,
            encrypted=encrypted,
        )


import os
from typing import Union
from PyPDF2 import PdfFileReader
from pptx import Presentation
import docx

DOCUMENT_EXTENSIONS = {"pdf", "doc", "docx", "ppt", "pptx"}


class DocumentMetadataExtractor(MetadataExtractor):
    def extract_metadata(self, file_path: str) -> DocumentMetadata:
        if not os.path.isfile(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")

        file_extension = file_path.split(".")[-1].lower()
        if file_extension not in DOCUMENT_EXTENSIONS:
            raise ValueError(f"Unsupported document format: {file_extension}")

        size = os.path.getsize(file_path)
        created_at = os.path.getctime(file_path)
        modified_at = os.path.getmtime(file_path)

        if file_extension == "pdf":
            return self._extract_pdf_metadata(file_path, size, created_at, modified_at)
        elif file_extension in {"doc", "docx"}:
            return self._extract_doc_metadata(file_path, size, created_at, modified_at)
        elif file_extension in {"ppt", "pptx"}:
            return self._extract_ppt_metadata(file_path, size, created_at, modified_at)
        else:
            raise ValueError(f"Unsupported document format: {file_extension}")

    def _extract_pdf_metadata(
        self, file_path: str, size: int, created_at: float, modified_at: float
    ) -> DocumentMetadata:
        try:
            with open(file_path, "rb") as f:
                reader = PdfFileReader(f)
                info = reader.getDocumentInfo()
                num_pages = reader.getNumPages()
                author = info.author if info.author else "Unknown"
                title = info.title if info.title else "Untitled"
                language = info.language if hasattr(info, "language") else "Unknown"
        except Exception as e:
            raise RuntimeError(f"Failed to extract PDF metadata: {str(e)}")

        return DocumentMetadata(
            size=size,
            created_at=created_at,
            modified_at=modified_at,
            num_pages=num_pages,
            author=author,
            title=title,
            language=language,
        )

    def _extract_doc_metadata(
        self, file_path: str, size: int, created_at: float, modified_at: float
    ) -> DocumentMetadata:
        try:
            doc = docx.Document(file_path)
            core_properties = doc.core_properties
            num_pages = len(doc.paragraphs)
            author = core_properties.author if core_properties.author else "Unknown"
            title = core_properties.title if core_properties.title else "Untitled"
            language = (
                core_properties.language
                if hasattr(core_properties, "language")
                else "Unknown"
            )
        except Exception as e:
            raise RuntimeError(f"Failed to extract DOC/DOCX metadata: {str(e)}")

        return DocumentMetadata(
            size=size,
            created_at=created_at,
            modified_at=modified_at,
            num_pages=num_pages,
            author=author,
            title=title,
            language=language,
        )

    def _extract_ppt_metadata(
        self, file_path: str, size: int, created_at: float, modified_at: float
    ) -> DocumentMetadata:
        try:
            presentation = Presentation(file_path)
            num_pages = len(presentation.slides)
            core_properties = presentation.core_properties
            author = core_properties.author if core_properties.author else "Unknown"
            title = core_properties.title if core_properties.title else "Untitled"
            language = (
                core_properties.language
                if hasattr(core_properties, "language")
                else "Unknown"
            )
        except Exception as e:
            raise RuntimeError(f"Failed to extract PPT/PPTX metadata: {str(e)}")

        return DocumentMetadata(
            size=size,
            created_at=created_at,
            modified_at=modified_at,
            num_pages=num_pages,
            author=author,
            title=title,
            language=language,
        )


class GenericMetadataExtractor:
    def __init__(self):
        self.extractors = {
            FileType.IMAGE: ImageMetadataExtractor(),
            FileType.VIDEO: VideoMetadataExtractor(),
            FileType.TEXT: TextMetadataExtractor(),
            FileType.AUDIO: AudioMetadataExtractor(),
            FileType.ARCHIVE: ArchiveMetadataExtractor(),
            FileType.DOCUMENT: DocumentMetadataExtractor(),
        }

    def extract_metadata(self, file_path: str) -> FileMetadata:
        file_extension = file_path.split(".")[-1].lower()
        file_type = DirectoryFileOrganizer.get_file_type(file_extension)
        extractor = self.extractors.get(file_type, None)
        if extractor:
            return extractor.extract_metadata(file_path)
        else:
            size = os.path.getsize(file_path)
            created_at = os.path.getctime(file_path)
            modified_at = os.path.getmtime(file_path)
            return FileMetadata(
                size=size, created_at=created_at, modified_at=modified_at
            )

### Metadata Extractors Testing


In [None]:
import unittest
from unittest.mock import patch, mock_open

DUMMY_IMAGE_PATH = "../../files/test/dummy.jpg"
DUMMY_VIDEO_PATH = "../../files/test/dummy.mp4"
DUMMY_TEXT_PATH = "../../files/test/dummy.rtf"
DUMMY_AUDIO_PATH = "../../files/test/dummy.wav"
DUMMY_ARCHIVE_PATH = "../../files/test/dummy.zip"
DUMMY_DOCUMENT_PATH = "../../files/test/dummy.pdf"


class TestMetadataExtractors(unittest.TestCase):
    def setUp(self):
        self.image_extractor = ImageMetadataExtractor()
        self.video_extractor = VideoMetadataExtractor()
        self.text_extractor = TextMetadataExtractor()
        self.audio_extractor = AudioMetadataExtractor()
        self.archive_extractor = ArchiveMetadataExtractor()
        self.document_extractor = DocumentMetadataExtractor()

    @patch("os.path.exists", return_value=True)
    @patch("os.path.getsize", return_value=1024)
    @patch("os.path.getctime", return_value=1609459200)
    @patch("os.path.getmtime", return_value=1609459200)
    @patch("PIL.Image.open")
    def test_image_metadata_extraction(
        self, mock_open_image, mock_getmtime, mock_getctime, mock_getsize, mock_exists
    ):
        mock_image = mock_open_image.return_value
        mock_image.size = (1920, 1080)
        mock_image.mode = "RGB"
        mock_image.format = "JPEG"

        metadata = self.image_extractor.extract_metadata(DUMMY_IMAGE_PATH)

        self.assertEqual(metadata.size, 1024)
        self.assertEqual(metadata.created_at, 1609459200)
        self.assertEqual(metadata.modified_at, 1609459200)
        self.assertEqual(metadata.resolution, (1920, 1080))
        self.assertEqual(metadata.color_mode, "RGB")
        self.assertEqual(metadata.format, "JPEG")

    @patch("os.path.exists", return_value=True)
    @patch("os.path.getsize", return_value=2048)
    @patch("os.path.getctime", return_value=1609459200)
    @patch("os.path.getmtime", return_value=1609459200)
    @patch("ffmpeg.probe")
    def test_video_metadata_extraction(
        self, mock_probe, mock_getmtime, mock_getctime, mock_getsize, mock_exists
    ):
        mock_probe.return_value = {
            "streams": [
                {
                    "codec_type": "video",
                    "width": 1920,
                    "height": 1080,
                    "r_frame_rate": "30/1",
                    "codec_name": "h264",
                }
            ],
            "format": {"duration": "60.0", "bit_rate": "1000000"},
        }

        metadata = self.video_extractor.extract_metadata(DUMMY_VIDEO_PATH)

        self.assertEqual(metadata.size, 2048)
        self.assertEqual(metadata.created_at, 1609459200)
        self.assertEqual(metadata.modified_at, 1609459200)
        self.assertEqual(metadata.duration, 60.0)
        self.assertEqual(metadata.resolution, (1920, 1080))
        self.assertEqual(metadata.framerate, 30)
        self.assertEqual(metadata.codec, "h264")
        self.assertEqual(metadata.bitrate, 1000000)

    @patch("os.path.exists", return_value=True)
    @patch("os.path.getsize", return_value=512)
    @patch("os.path.getctime", return_value=1609459200)
    @patch("os.path.getmtime", return_value=1609459200)
    @patch("builtins.open", new_callable=mock_open, read_data=b"Hello World")
    @patch("chardet.detect", return_value={"encoding": "utf-8"})
    def test_text_metadata_extraction(
        self,
        mock_detect,
        mock_open_file,
        mock_getmtime,
        mock_getctime,
        mock_getsize,
        mock_exists,
    ):
        with patch.object(self.text_extractor, "detect_language", return_value="en"):
            metadata = self.text_extractor.extract_metadata(DUMMY_TEXT_PATH)

        self.assertEqual(metadata.size, 512)
        self.assertEqual(metadata.created_at, 1609459200)
        self.assertEqual(metadata.modified_at, 1609459200)
        self.assertEqual(metadata.num_words, 2)
        self.assertEqual(metadata.language, "en")
        self.assertEqual(metadata.encoding, "utf-8")

    @patch("os.path.exists", return_value=True)
    @patch("os.path.getsize", return_value=1024)
    @patch("os.path.getctime", return_value=1609459200)
    @patch("os.path.getmtime", return_value=1609459200)
    @patch("ffmpeg.probe")
    def test_audio_metadata_extraction(
        self, mock_probe, mock_getmtime, mock_getctime, mock_getsize, mock_exists
    ):
        mock_probe.return_value = {
            "format": {"duration": "180.0", "bit_rate": "320000"},
            "streams": [
                {
                    "codec_type": "audio",
                    "sample_rate": "44100",
                    "channels": 2,
                    "codec_name": "mp3",
                }
            ],
        }
        metadata = self.audio_extractor.extract_metadata(DUMMY_AUDIO_PATH)
        self.assertEqual(metadata.size, 1024)
        self.assertEqual(metadata.created_at, 1609459200)
        self.assertEqual(metadata.modified_at, 1609459200)
        self.assertEqual(metadata.duration, 180.0)
        self.assertEqual(metadata.bitrate, 320000)
        self.assertEqual(metadata.sample_rate, 44100)
        self.assertEqual(metadata.channels, 2)
        self.assertEqual(metadata.codec, "mp3")

    @patch("os.path.exists", return_value=True)
    @patch("os.path.getsize", return_value=4096)
    @patch("os.path.getctime", return_value=1609459200)
    @patch("os.path.getmtime", return_value=1609459200)
    @patch("zipfile.is_zipfile", return_value=True)
    @patch("zipfile.ZipFile")
    def test_archive_metadata_extraction(
        self,
        mock_zipfile,
        mock_is_zipfile,
        mock_getmtime,
        mock_getctime,
        mock_getsize,
        mock_exists,
    ):
        mock_zip = mock_zipfile.return_value
        mock_zip.namelist.return_value = ["file1.txt", "file2.txt"]
        mock_zip.infolist.return_value = [
            zipfile.ZipInfo(filename="file1.txt"),
            zipfile.ZipInfo(filename="file2.txt"),
        ]
        metadata = self.archive_extractor.extract_metadata(DUMMY_ARCHIVE_PATH)
        self.assertEqual(metadata.size, 4096)
        self.assertEqual(metadata.created_at, 1609459200)
        self.assertEqual(metadata.modified_at, 1609459200)
        self.assertEqual(metadata.num_files, 2)
        self.assertEqual(metadata.compression_type, "zip")
        self.assertFalse(metadata.encrypted)

    @patch("os.path.isfile", return_value=True)
    @patch("os.path.getsize", return_value=2048)
    @patch("os.path.getctime", return_value=1609459200)
    @patch("os.path.getmtime", return_value=1609459200)
    @patch("docx.Document")
    def test_document_metadata_extraction(
        self, mock_docx, mock_getmtime, mock_getctime, mock_getsize, mock_isfile
    ):
        mock_doc = mock_docx.return_value
        mock_doc.core_properties.author = "Author"
        mock_doc.core_properties.title = "Title"
        mock_doc.core_properties.language = "en"
        mock_doc.paragraphs = ["Paragraph 1", "Paragraph 2"]
        metadata = self.document_extractor.extract_metadata(DUMMY_DOCUMENT_PATH)
        self.assertEqual(metadata.size, 2048)
        self.assertEqual(metadata.created_at, 1609459200)
        self.assertEqual(metadata.modified_at, 1609459200)
        self.assertEqual(metadata.num_pages, 2)
        self.assertEqual(metadata.author, "Author")
        self.assertEqual(metadata.title, "Title")
        self.assertEqual(metadata.language, "en")


unittest.main(argv=[""], exit=False)

# Deduplicators


### Image Deduplicator


In [None]:
from typing import Dict, List, Optional, Tuple
from PIL import Image
import imagehash
from datasketch import MinHash, MinHashLSH
from concurrent.futures import ThreadPoolExecutor, as_completed
from pydantic import BaseModel


class DuplicateResult(BaseModel):
    original: str
    duplicates: List[str]
    exact_matches: List[str]


class ImageDeduplicator:
    def __init__(
        self, threshold: float = 0.5, hash_size: int = 16, num_perm: int = 256
    ):
        self.threshold: float = threshold
        self.hash_size: int = hash_size
        self.num_perm: int = num_perm
        self.lsh: MinHashLSH = MinHashLSH(threshold=threshold, num_perm=num_perm)
        self.image_hashes: Dict[str, imagehash.ImageHash] = {}

    def _compute_hash(self, image_path: str) -> Optional[imagehash.ImageHash]:
        try:
            image = Image.open(image_path)
            phash = imagehash.phash(image, hash_size=self.hash_size)
            return phash
        except Exception:
            return None

    def _compute_minhash(self, phash: imagehash.ImageHash) -> Optional[MinHash]:
        try:
            minhash = MinHash(num_perm=self.num_perm)
            for hash_value in phash.hash.flatten():
                minhash.update(int(hash_value).to_bytes(1, byteorder="big"))
            return minhash
        except Exception:
            return None

    def add_image(self, image_path: str) -> None:
        phash = self._compute_hash(image_path)
        if phash is None:
            return
        minhash = self._compute_minhash(phash)
        if minhash is None:
            return
        self.lsh.insert(image_path, minhash)
        self.image_hashes[image_path] = phash

    def find_duplicates(self, image_path: str) -> Tuple[List[str], List[str]]:
        phash = self._compute_hash(image_path)
        if phash is None:
            return [], []
        minhash = self._compute_minhash(phash)
        if minhash is None:
            return [], []
        duplicates = self.lsh.query(minhash)
        similar_files = [
            dup for dup in duplicates if self._is_similar(phash, self.image_hashes[dup])
        ]
        exact_matches = [
            dup
            for dup in duplicates
            if self._is_exact_match(phash, self.image_hashes[dup])
        ]
        return similar_files, exact_matches

    def _is_similar(
        self, phash1: imagehash.ImageHash, phash2: imagehash.ImageHash
    ) -> bool:
        similarity = 1 - (phash1 - phash2) / len(phash1.hash.flatten())
        return similarity >= self.threshold

    def _is_exact_match(
        self, phash1: imagehash.ImageHash, phash2: imagehash.ImageHash
    ) -> bool:
        return phash1 == phash2

    def deduplicate_image_paths(self, image_paths: List[str]) -> List[DuplicateResult]:
        results: List[DuplicateResult] = []
        with ThreadPoolExecutor() as executor:
            futures = {
                executor.submit(self._process_image, image_path): image_path
                for image_path in image_paths
                if DirectoryFileOrganizer.get_file_type(image_path.split(".")[-1])
                == FileType.IMAGE
            }
            for future in as_completed(futures):
                image_path = futures[future]
                try:
                    duplicates, exact_matches = future.result()
                    if duplicates or exact_matches:
                        results.append(
                            DuplicateResult(
                                original=image_path,
                                duplicates=duplicates,
                                exact_matches=exact_matches,
                            )
                        )
                except Exception:
                    pass
        return results

    def _process_image(self, image_path: str) -> Tuple[List[str], List[str]]:
        duplicates, exact_matches = self.find_duplicates(image_path)
        if not duplicates and not exact_matches:
            self.add_image(image_path)
        return duplicates, exact_matches


deduplicator = ImageDeduplicator(threshold=0.9)
image_paths = DirectoryFileOrganizer.list_files_recursive(directory_path)
image_paths = DirectoryFileOrganizer.group_files_by_type(image_paths)[FileType.IMAGE]
print(len(image_paths))
deduplicator.deduplicate_image_paths(image_paths)