From 379ad576de60634d92935dcae70bdcf58101ea4c Mon Sep 17 00:00:00 2001 From: Rhet Turnbull Date: Fri, 24 May 2024 16:33:12 -0700 Subject: [PATCH] Updated content_tree to fallback to mdls if needed --- osxphotos/cli/import_cli.py | 186 +++++++++++++++++++++------------- osxphotos/image_file_utils.py | 30 +++++- 2 files changed, 144 insertions(+), 72 deletions(-) diff --git a/osxphotos/cli/import_cli.py b/osxphotos/cli/import_cli.py index 673ef816..3c2526e9 100644 --- a/osxphotos/cli/import_cli.py +++ b/osxphotos/cli/import_cli.py @@ -24,6 +24,7 @@ import click from rich.console import Console from rich.markdown import Markdown +from rich.progress import SpinnerColumn, Progress from strpdatetime import strpdatetime from osxphotos.fileutil import FileUtilMacOS @@ -1055,7 +1056,9 @@ def import_cli( report_file = render_and_validate_report(report) if report else None relative_to = pathlib.Path(relative_to) if relative_to else None - files_or_dirs = collect_files_to_import(files_or_dirs, walk, glob) + files_or_dirs = collect_files_to_import( + files_or_dirs, walk, glob, verbose, no_progress + ) if check_templates: check_templates_and_exit( files=files_or_dirs, @@ -1082,6 +1085,8 @@ def import_cli( exiftool_path, sidecar, sidecar_filename_template, + verbose, + no_progress, ) # need to get the library path to initialize FingerprintQuery @@ -1363,21 +1368,22 @@ def add_photo_to_albums_from_exportdb( dry_run: bool, ) -> list[str]: """Add photo to one or more albums from data found in export database""" - if photoinfo := export_db_get_photoinfo_for_filepath( - exportdb_path=exportdb_path, filepath=filepath, exiftool=exiftool_path - ): - if photoinfo.album_info: + with suppress(ValueError): + if photoinfo := export_db_get_photoinfo_for_filepath( + exportdb_path=exportdb_path, filepath=filepath, exiftool=exiftool_path + ): + if photoinfo.album_info: + verbose( + f"Setting albums from export database for [filename]{filepath.name}[/]" + ) + return add_photo_to_photoinfo_albums( + photo, photoinfo, filepath, verbose, dry_run + ) + else: verbose( - f"Setting albums from export database for [filename]{filepath.name}[/]" + f"Could not load metadata from export database for [filename]{filepath.name}[/]" ) - return add_photo_to_photoinfo_albums( - photo, photoinfo, filepath, verbose, dry_run - ) - else: - verbose( - f"Could not load metadata from export database for [filename]{filepath.name}[/]" - ) - return [] + return [] def add_duplicate_to_albums( @@ -2510,63 +2516,90 @@ def filename_matches_patterns(filename: str, patterns: tuple[str, ...]) -> bool: def collect_files_to_import( - files: tuple[str, ...], walk: bool, glob: tuple[str, ...] + files: tuple[str, ...], + walk: bool, + glob: tuple[str, ...], + verbose: Callable[..., None], + no_progress: bool, ) -> list[pathlib.Path]: """Collect files to import, recursively if necessary Args: files: list of initial files or directories to import walk: whether to walk directories - glob: optional glob patterns to match files + glob: glob patterns to match files or empty tuple if none + verbose: function to print verbose output + no_progress: if True, do not print progress bars Note: ignores any files that appear to be image sidecar files """ files_to_import = [] - for file in files: - if os.path.isfile(file): - files_to_import.append(file) - elif os.path.isdir(file): - if not walk: - # don't recurse but do collect all files in the directory - dir_files = [ - os.path.join(file, f) - for f in os.listdir(file) - if os.path.isfile(os.path.join(file, f)) - ] - files_to_import.extend(dir_files) + with rich_progress(console=get_verbose_console(), mock=no_progress) as progress: + task = progress.add_task("Collecting files to import...", total=None) + for file in files: + if os.path.isfile(file): + files_to_import.append(file) + progress.advance(task) + elif os.path.isdir(file): + if not walk: + # don't recurse but do collect all files in the directory + dir_files = [ + os.path.join(file, f) + for f in os.listdir(file) + if os.path.isfile(os.path.join(file, f)) + ] + files_to_import.extend(dir_files) + progress.advance(task) + else: + for root, dirs, filenames in os.walk(file): + for file in filenames: + files_to_import.append(os.path.join(root, file)) + progress.advance(task) else: - for root, dirs, filenames in os.walk(file): - for file in filenames: - files_to_import.append(os.path.join(root, file)) - else: - continue + progress.advance(task) + continue if glob: + verbose("Filtering files with glob...") files_to_import = [ f for f in files_to_import if filename_matches_patterns(os.path.basename(f), glob) ] + + verbose(f"Getting absolute path of each import file...", level=2) files_to_import = [pathlib.Path(f).absolute() for f in files_to_import] - # strip any sidecar files - files_to_import = [ - f for f in files_to_import if f.suffix.lower() not in [".json", ".xmp"] - ] + # files_to_import = [ + # f for f in files_to_import if f.suffix.lower() not in [".json", ".xmp"] + # ] - # strip osxphotos export db in case importing an osxphotos export - files_to_import = [f for f in files_to_import if not f.name == OSXPHOTOS_EXPORT_DB] + # # strip osxphotos export db in case importing an osxphotos export + # files_to_import = [f for f in files_to_import if not f.name == OSXPHOTOS_EXPORT_DB] - # strip any files starting with a . - files_to_import = [f for f in files_to_import if not f.name.startswith(".")] + # # strip any files starting with a . + # files_to_import = [f for f in files_to_import if not f.name.startswith(".")] + + # keep only image files, video files, and .aae files + # files_to_import = [ + # f + # for f in files_to_import + # if is_image_file(f) or is_video_file(f) or f.suffix.lower() == ".aae" + # ] + # return files_to_import # keep only image files, video files, and .aae files - files_to_import = [ - f - for f in files_to_import - if is_image_file(f) or is_video_file(f) or f.suffix.lower() == ".aae" - ] - return files_to_import + filtered_file_list = [] + with rich_progress(console=get_verbose_console(), mock=no_progress) as progress: + task = progress.add_task( + "Filtering import list for image & video files...", + total=len(files_to_import), + ) + for f in files_to_import: + if is_image_file(f) or is_video_file(f) or f.suffix.lower() == ".aae": + filtered_file_list.append(f) + progress.advance(task) + return filtered_file_list def group_files_to_import( @@ -2577,36 +2610,49 @@ def group_files_to_import( exiftool_path: str | None, sidecar: bool, sidecar_filename_template: str | None, + verbose: Callable[..., None], + no_progress: bool, ) -> list[tuple[pathlib.Path, ...]]: """Group files by live photo, burst UUID, raw+jpeg, etc.""" # first collect all files by parent directory files_by_parent = {} - for file in files: - parent = file.parent - if parent not in files_by_parent: - files_by_parent[parent] = [] - files_by_parent[parent].append(file) + with rich_progress(console=get_verbose_console(), mock=no_progress) as progress: + task = progress.add_task( + "Grouping files by parent directory...", total=len(files) + ) + for file in files: + parent = file.parent + if parent not in files_by_parent: + files_by_parent[parent] = [] + files_by_parent[parent].append(file) + progress.advance(task) # walk through each parent directory and group files by same stem grouped_files = [] - for parent, files in files_by_parent.items(): - files_by_stem, remainder = group_files_by_stem(files) - if files_by_stem: - grouped_files.extend(files_by_stem) - bursts, remainder = group_files_by_burst_uuid(remainder) - if bursts: - grouped_files.extend(bursts) - grouped_files, remainder = group_files_with_edited(grouped_files, remainder) - grouped_files, remainder = group_files_with_edited_suffix( - grouped_files, - remainder, - edited_suffix, - relative_filepath, - exiftool_path, - sidecar, - sidecar_filename_template, - ) - grouped_files.extend(tuple([r]) for r in remainder) + with rich_progress(console=get_verbose_console(), mock=no_progress) as progress: + task = progress.add_task( + "Grouping files into import groups...", total=len(files_by_parent) + ) + for parent, files in files_by_parent.items(): + files_by_stem, remainder = group_files_by_stem(files) + if files_by_stem: + grouped_files.extend(files_by_stem) + bursts, remainder = group_files_by_burst_uuid(remainder) + if bursts: + grouped_files.extend(bursts) + grouped_files, remainder = group_files_with_edited(grouped_files, remainder) + grouped_files, remainder = group_files_with_edited_suffix( + grouped_files, + remainder, + edited_suffix, + relative_filepath, + exiftool_path, + sidecar, + sidecar_filename_template, + ) + grouped_files.extend(tuple([r]) for r in remainder) + progress.advance(task) + files_to_import = [] for group in grouped_files: files_to_import.append(sort_paths(group)) diff --git a/osxphotos/image_file_utils.py b/osxphotos/image_file_utils.py index ae77c63f..2a867df7 100644 --- a/osxphotos/image_file_utils.py +++ b/osxphotos/image_file_utils.py @@ -10,6 +10,8 @@ from functools import cache from typing import Any from weakref import ref +import subprocess +import logging from osxphotos.platform import assert_macos @@ -19,6 +21,8 @@ import makelive import osxmetadata +logger = logging.getLogger("osxphotos") + # regular expressions to match original + edited pairs # if a pair of photos matching these regular expressions is imported, Photos creates an edited photo on import ORIGINAL_RE = r"^(.*\/?)([A-Za-z]{3})_(\d{4}).*$" @@ -27,8 +31,30 @@ def content_tree(filepath: str | os.PathLike) -> list[str]: """Return the content tree for a file""" - md = osxmetadata.OSXMetaData(str(filepath)) - return md.get("kMDItemContentTypeTree") or [] + try: + md = osxmetadata.OSXMetaData(str(filepath)) + return md.get("kMDItemContentTypeTree") or [] + except Exception as e: + # sometimes osxmetadata fails on certain external volumes, so try using mdls + return content_tree_mdls(filepath) + + +def content_tree_mdls(filepath: str | os.PathLike) -> list[str]: + try: + result = subprocess.run( + ["mdls", "-raw", "-name", "kMDItemContentTypeTree", filepath], + capture_output=True, + text=True, + check=True, + ) + # Clean up the output by removing the enclosing parentheses and splitting into a list + output = result.stdout.strip() + output = output.strip("()%") + content_types = [item.strip().strip('"') for item in output.split(",")] + return content_types + except subprocess.CalledProcessError as e: + logger.warning(f"Error executing mdls command to get content tree: {e}") + return [] @cache