# Media Scanner

> Scans directories for media files with caching support

In [None]:
#| default_exp media.scanner

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import os
import time
from pathlib import Path
from typing import Any, List, Dict, Optional, Tuple
from fastcore.basics import patch

from cjm_fasthtml_workflow_transcription_single_file.media.config import MediaConfig
from cjm_fasthtml_workflow_transcription_single_file.media.models import MediaFile
from cjm_fasthtml_workflow_transcription_single_file.media.utils import format_file_size, format_timestamp, matches_patterns

## MediaScanner Class

Scans configured directories for audio and video files. Supports caching to avoid repeated filesystem access and filtering by media type.

In [None]:
#| export
class MediaScanner:
    """Scans directories for media files with instance-level caching."""

    def __init__(
        self,
        config: MediaConfig  # Media configuration with directories and filters
    ):
        """Initialize the scanner."""
        self.config = config
        self._cache: Optional[List[MediaFile]] = None
        self._cache_timestamp: float = 0

In [None]:
#| export
@patch
def _is_cache_valid(
    self: MediaScanner
) -> bool:  # True if cache exists and hasn't expired
    """Check if cache is still valid."""
    if not self.config.cache_results or self._cache is None:
        return False
    cache_duration = self.config.cache_duration_minutes * 60
    return (time.time() - self._cache_timestamp) < cache_duration

In [None]:
#| export
@patch
def clear_cache(
    self: MediaScanner
) -> None:
    """Clear the scan cache."""
    self._cache = None
    self._cache_timestamp = 0

In [None]:
#| export
@patch
def _update_cache(
    self: MediaScanner, 
    files: List[MediaFile]  # List of scanned MediaFile objects
) -> None:
    """Update cache with new scan results."""
    self._cache = files
    self._cache_timestamp = time.time()

## File Scanning Methods

In [None]:
#| export
@patch
def _scan_directories(
    self: MediaScanner
) -> List[MediaFile]:  # List of MediaFile objects matching the configuration
    """Perform actual directory scan."""
    media_files = []

    # Prepare extensions set
    extensions = set()
    if self.config.scan_video:
        extensions.update(ext.lower() for ext in self.config.video_extensions)
    if self.config.scan_audio:
        extensions.update(ext.lower() for ext in self.config.audio_extensions)

    if not extensions:
        return []

    # Get file size limits in bytes
    max_size = self.config.max_file_size_mb * 1024 * 1024
    min_size = self.config.min_file_size_kb * 1024

    # Prepare extension sets for type detection
    video_exts = set(ext.lower() for ext in self.config.video_extensions)
    audio_exts = set(ext.lower() for ext in self.config.audio_extensions)

    # Scan each directory
    for scan_dir in self.config.directories:
        if not Path(scan_dir).exists():
            continue

        scan_path = Path(scan_dir)

        # Determine glob pattern
        pattern = "**/*" if self.config.recursive_scan else "*"

        # Scan for files
        for file_path in scan_path.glob(pattern):
            # Skip directories
            if file_path.is_dir():
                continue

            # Skip symlinks if not following them
            if file_path.is_symlink() and not self.config.follow_symlinks:
                continue

            # Skip hidden files if configured
            if not self.config.include_hidden and file_path.name.startswith('.'):
                continue

            # Check exclude patterns
            if matches_patterns(str(file_path), self.config.exclude_patterns):
                continue

            # Check extension
            extension = file_path.suffix[1:].lower() if file_path.suffix else ""
            if extension not in extensions:
                continue

            # Get file stats
            try:
                stats = file_path.stat()
                file_size = stats.st_size

                # Check size limits
                if min_size > 0 and file_size < min_size:
                    continue
                if max_size > 0 and file_size > max_size:
                    continue

                # Determine media type
                if extension in video_exts:
                    media_type = "video"
                elif extension in audio_exts:
                    media_type = "audio"
                else:
                    continue  # Skip unknown types

                # Create MediaFile object
                media_file = MediaFile(
                    path=str(file_path),
                    name=file_path.name,
                    extension=extension,
                    size=file_size,
                    size_str=format_file_size(file_size),
                    modified=stats.st_mtime,
                    modified_str=format_timestamp(stats.st_mtime),
                    media_type=media_type,
                    directory=str(file_path.parent)
                )
                media_files.append(media_file)

            except (OSError, PermissionError):
                # Skip files we can't access
                continue

    # Sort results
    media_files = self._sort_files(media_files)

    # Apply max results limit
    if self.config.max_results > 0 and len(media_files) > self.config.max_results:
        media_files = media_files[:self.config.max_results]

    return media_files

In [None]:
#| export
@patch
def _sort_files(
    self: MediaScanner,
    files: List[MediaFile]  # Files to sort
) -> List[MediaFile]:  # Sorted files
    """Sort files according to configuration."""
    sort_by = self.config.sort_by
    reverse = self.config.sort_descending

    if sort_by == "name":
        files.sort(key=lambda f: f.name.lower(), reverse=reverse)
    elif sort_by == "size":
        files.sort(key=lambda f: f.size, reverse=reverse)
    elif sort_by == "modified":
        files.sort(key=lambda f: f.modified, reverse=reverse)

    return files

In [None]:
#| export
@patch
def scan(
    self: MediaScanner, 
    force_refresh: bool = False  # Force a fresh scan, ignoring cache
) -> List[MediaFile]:  # List of MediaFile objects
    """Scan for media files, using cache if valid."""
    if not force_refresh and self._is_cache_valid():
        return self._cache

    media_files = self._scan_directories()
    self._update_cache(media_files)
    return media_files

In [None]:
#| export
@patch
def get_summary(
    self: MediaScanner
) -> Dict[str, Any]:  # Dictionary with total count, size, and breakdowns by type/extension
    """Get summary statistics for scanned files."""
    files = self.scan()

    if not files:
        return {
            "total_files": 0,
            "total_size": 0,
            "total_size_str": "0 B",
            "by_type": {},
            "by_extension": {}
        }

    total_size = sum(f.size for f in files)

    # Count by type
    by_type = {}
    for f in files:
        by_type[f.media_type] = by_type.get(f.media_type, 0) + 1

    # Count by extension
    by_extension = {}
    for f in files:
        ext = f.extension.upper() if f.extension else "NO EXT"
        by_extension[ext] = by_extension.get(ext, 0) + 1

    # Sort extensions by count and limit to top 10
    by_extension = dict(sorted(
        by_extension.items(),
        key=lambda x: x[1],
        reverse=True
    )[:10])

    return {
        "total_files": len(files),
        "total_size": total_size,
        "total_size_str": format_file_size(total_size),
        "by_type": by_type,
        "by_extension": by_extension
    }

## Usage Examples

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()