diff --git a/docs/macros.md b/docs/macros.md index 9268d267a..41f7edd55 100644 --- a/docs/macros.md +++ b/docs/macros.md @@ -48,3 +48,20 @@ Tool is in development. Will allow for user-defined sorting of [fields](fields.m ### Folders to Tags Creates tags from the existing folder structure in the library, which are previewed in a hierarchy view for the user to confirm. A tag will be created for each folder and applied to all entries, with each subfolder being linked to the parent folder as a [parent tag](tags.md#parent-tags). Tags will initially be named after the folders, but can be fully edited and customized afterwards. + +### Paths to Fields + +Populates fields on entries based on their file paths. Users can define regular expressions to extract specific parts of the path, which can be referenced when adding a field. +In addition, simple operations (`++` and `--`) can be applied on numberic fields. This allows 0-indexed fields to be converted to 1-indexed fields, and vise-a-versa. +Example usage: +: Say you have paths like +: `TagStudioLibrary/artist-artistusername/series name/work title --- page 0.png` +: We want to extract `artistusername`, `series name`, `work title`, and `0` (the page number). +: To do this, we can define an expression to fully constrain our path. We *can* allow looser constraints, however if we do that we need to be more careful ensuring the preview matches our desired outcome. +: Here are some handy pieces: +: * `[^\.]+$` - This matches anything after the final `.` in the path. In other words, the file extension. Even if your path contains a `.`, this ensures the matching does not end early. `$` is an anchor to the end of the line. Similarly, `^` is the anchor to the start, so can be used in the begining. We need to escape `.` with a `\`, because `.` means "match any character once" in regex. `+` means "match this pattern one or more times". +: * `\\` and `\/` - these match your directory (folder) seperators. Which you use can depend on your Operating System, so use of `[\\\/]` (which matches both) is encouraged. +: * `[^\\\/]+` - Similar to the previous, but this does the opposite. This matches as many characters as it can, before it runs into a folder seperator. This is helpful in ensureing that each field you capture is truly in the folder level you expect, and not because the name of an internal folder is similar to that of an external one. +: * `\d+` and `\s+` - These match one or more digit and one or more whitespace (like spaces and tabs), respectively. If you need to further constrain this, you can use `\s?` (match a space if its there, otherwise continue) or `\d{3,5}` (match 3 to 5 digits only) to do so. +: * `(?Pmatch_pattern)` - This is a named capture group. We can define `match_pattern` to match the field we want, and make `name_of_group` our field name. This allows us to use `$name_of_group` to reference the item. If these groups were unnamed, we would need to count the order in which they occur, and use their number (ie, the first item is `$1`). +: Putting this together, we can make our regex capture: `artist-(?P[^\\\/]+)[\\\/](?P[^\\\/]+)[\\\/](?P.+) --- page\s?(?P<page>\d+)[^\.]+$` \ No newline at end of file diff --git a/src/tagstudio/core/library/alchemy/library.py b/src/tagstudio/core/library/alchemy/library.py index a25231e95..b03118266 100644 --- a/src/tagstudio/core/library/alchemy/library.py +++ b/src/tagstudio/core/library/alchemy/library.py @@ -966,6 +966,12 @@ def remove_entries(self, entry_ids: list[int]) -> None: session.query(Entry).where(Entry.id.in_(sub_list)).delete() session.commit() + def entry_count(self) -> int: + """Return the total number of entries in the library.""" + with Session(self.engine) as session: + count = session.scalar(select(func.count(Entry.id))) + return int(count or 0) + def has_path_entry(self, path: Path) -> bool: """Check if item with given path is in library already.""" with Session(self.engine) as session: @@ -1262,6 +1268,73 @@ def get_value_type(self, field_key: str) -> ValueType: session.expunge(field) return field + def add_value_type( + self, + key: str, + *, + name: str | None = None, + field_type: FieldTypeEnum = FieldTypeEnum.TEXT_LINE, + is_default: bool = False, + position: int | None = None, + ) -> ValueType: + """Create a new ValueType row and return it. + + - Preserves the provided `key` as-is. + - Derives a display `name` from key when not provided. + - Appends to the end of current field positions when `position` is not provided. + """ + display_name = name or key.replace("_", " ").title() + + with Session(self.engine) as session: + existing = session.scalar(select(ValueType).where(ValueType.key == key)) + if existing: + session.expunge(existing) + return existing + + if position is None: + max_pos = session.scalar(select(func.max(ValueType.position))) + position = (max_pos or 0) + 1 + + vt = ValueType( + key=key, + name=display_name, + type=field_type, + is_default=is_default, + position=position, + ) + try: + session.add(vt) + session.commit() + session.expunge(vt) + except IntegrityError: + session.rollback() + # Fetch the existing row to return a consistent object + vt = unwrap(session.scalar(select(ValueType).where(ValueType.key == key))) + session.expunge(vt) + return vt + + def ensure_value_type( + self, + key: str, + *, + name: str | None = None, + field_type: FieldTypeEnum = FieldTypeEnum.TEXT_LINE, + is_default: bool = False, + ) -> ValueType: + """Get or create a `ValueType` with the provided key. + + Returns the existing type when present; otherwise creates it. + """ + try: + return self.get_value_type(key) + except Exception: + return self.add_value_type( + key, + name=name, + field_type=field_type, + is_default=is_default, + ) + def add_field_to_entry( self, entry_id: int, diff --git a/src/tagstudio/qt/mixed/paths_to_fields.py b/src/tagstudio/qt/mixed/paths_to_fields.py new file mode 100644 index 000000000..6dbc6b746 --- /dev/null +++ b/src/tagstudio/qt/mixed/paths_to_fields.py @@ -0,0 +1,860 @@ +# TODO list +# UI bugs +# - When preview loads, it extends below the apply button, likely because scrollbar isn't calculated +# - Multi-line fields sometimes get cut off when adding/removing mappings so they show up as 1 line. +from __future__ import annotations + +import re +from collections.abc import Callable, Iterable, Iterator +from contextlib import suppress +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from PySide6.QtCore import Qt, QThreadPool +from PySide6.QtGui import QTextOption +from PySide6.QtWidgets import ( + QCheckBox, + QComboBox, + QFormLayout, + QFrame, + QHBoxLayout, + QLabel, + QLineEdit, + QMessageBox, + QPlainTextEdit, + QProgressBar, + QPushButton, + QSizePolicy, + QVBoxLayout, + QWidget, +) + +from tagstudio.core.library.alchemy.enums import FieldTypeEnum +from tagstudio.core.library.alchemy.fields import FieldID +from tagstudio.core.library.alchemy.library import Library +from tagstudio.core.library.alchemy.models import Entry +from tagstudio.core.utils.types import unwrap +from tagstudio.qt.translations import Translations +from tagstudio.qt.utils.custom_runnable import CustomRunnable +from tagstudio.qt.utils.function_iterator import FunctionIterator + +if TYPE_CHECKING: + from tagstudio.qt.ts_qt import QtDriver + + +@dataclass +class PathFieldRule: + """Define how to extract data from a path and map to fields. + + pattern: Full regex applied to the entry path (string form). Supports + numbered groups ($1) and named groups ($name / ${name}). + fields: A list of (field_key, template) pairs. Templates can contain + placeholders like "$1", "$name", or "${name}". Dicts are accepted + for backward compatibility and will be converted preserving iteration order. + use_filename_only: If True, match only against the filename, else full path. + flags: Regex flags OR'd, e.g. re.IGNORECASE. + """ + + pattern: str + fields: list[tuple[str, str]] + use_filename_only: bool = False + flags: int = 0 + + def __post_init__(self) -> None: + # Back-compat: allow callers/tests to pass a dict mapping. + if isinstance(self.fields, dict): + self.fields = list(self.fields.items()) + + def compile(self) -> re.Pattern[str]: + return re.compile(self.pattern, self.flags) + + +@dataclass +class EntryFieldUpdate: + entry_id: int + path: str + # list of (field_key, value) to preserve duplicates and order + updates: list[tuple[str, str]] = field(default_factory=list) + + +@dataclass +class PreviewProgress: + index: int + total: int | None + path: str + update: EntryFieldUpdate | None + + +PLACEHOLDER_RE = re.compile( + r"\$(?:\{(?P<n1>[A-Za-z_][A-Za-z0-9_]*)\}|(?P<n2>[A-Za-z_][A-Za-z0-9_]*)|(?P<i>\d+))(?P<op>\+\+|--)?" +) + + +def _expand_template(template: str, match: re.Match[str]) -> str: + def repl(m: re.Match[str]) -> str: + original = "" + if (idx := m.group("i")) is not None: + try: + original = match.group(int(idx)) or "" + except IndexError: + original = "" + else: + name = m.group("n1") or m.group("n2") + if name: + original = match.groupdict().get(name, "") or "" + + op = m.group("op") + if not op: + return original + + # Apply simple numeric transforms with zero-fill preservation + if original.isdigit(): + width = len(original) + try: + num = int(original) + if op == "++": + num += 1 + elif op == "--": + num -= 1 + return str(num).zfill(width) + except ValueError: + return original + return original + + return PLACEHOLDER_RE.sub(repl, template) + + +def _iter_entries(library: Library) -> Iterable[Entry]: + # with_joins=True ensures we can inspect current fields when needed + yield from library.all_entries(with_joins=True) + +def iter_preview_paths_to_fields( + library: Library, + rules: list[PathFieldRule], + only_unset: bool = True, + *, + cancel_callback: Callable[[], bool] | None = None, +) -> Iterator[PreviewProgress]: + compiled = [(r, r.compile()) for r in rules] + try: + total = library.entry_count() + except Exception: + total = None + + base_path = None + try: + folder_obj = getattr(library, "folder", None) + if folder_obj is not None: + base_path = getattr(folder_obj, "path", None) + except Exception: + base_path = None + + for index, entry in enumerate(_iter_entries(library), start=1): + if cancel_callback and cancel_callback(): + break + + try: + if base_path is not None: + rel = entry.path.relative_to(base_path) + full_path = rel.as_posix() + else: + full_path = ( + entry.path.as_posix() + if hasattr(entry.path, "as_posix") + else str(entry.path).replace("\\", "/") + ) + except Exception: + full_path = ( + entry.path.as_posix() + if hasattr(entry.path, "as_posix") + else str(entry.path).replace("\\", "/") + ) + + pending_list: list[tuple[str, str]] = [] + + skip_keys: set[str] = set() + if only_unset: + for f in entry.fields: + if (f.value or "") != "": + skip_keys.add(f.type_key) + + for rule, cre in compiled: + target = entry.filename if rule.use_filename_only else full_path + m = cre.search(target) + if not m: + continue + + for key, tmpl in rule.fields: + if only_unset and key in skip_keys: + continue + value = _expand_template(tmpl, m).strip() + if value == "": + continue + + pending_list.append((key, value)) + + update = None + if pending_list: + update = EntryFieldUpdate(entry_id=entry.id, path=full_path, updates=pending_list) + + yield PreviewProgress(index=index, total=total, path=full_path, update=update) + + +def preview_paths_to_fields( + library: Library, + rules: list[PathFieldRule], + only_unset: bool = True, +) -> list[EntryFieldUpdate]: + """Return a dry-run of field updates inferred from entry paths. + + - Respects existing non-empty field values when only_unset=True. + - Supports multiple rules; first matching rule contributes its mapped fields. + """ + results: list[EntryFieldUpdate] = [] + for progress in iter_preview_paths_to_fields(library, rules, only_unset=only_unset): + if progress.update: + results.append(progress.update) + return results + + +# ** TODO: document the optional 'field_types' parameter (maps field keys to FieldTypeEnum) +def apply_paths_to_fields( + library: Library, + updates: list[EntryFieldUpdate], + *, + create_missing_field_types: bool = True, + overwrite: bool = False, + field_types: dict[str, FieldTypeEnum] | None = None, + allow_existing: bool = False, +) -> int: + """Apply field updates to entries. + + - If a field key doesn't exist, optionally create a new ValueType. + - If the field already exists on an entry: + - Overwrite when overwrite=True + - Otherwise only fill when existing value is empty or None unless allow_existing=True, + in which case new values are appended without replacing existing ones. + + Returns the count of individual field updates applied. + """ + applied = 0 + + for upd in updates: + entry = unwrap(library.get_entry_full(upd.entry_id)) + + # Group proposed updates by field key to handle duplicates and overwrites deterministically + grouped: dict[str, list[str]] = {} + for key, value in upd.updates: + grouped.setdefault(key, []).append(value) + + for key, values in grouped.items(): + # ensure field type exists if requested + if create_missing_field_types: + _ensure_fn = getattr(library, "ensure_value_type", None) + ftype = FieldTypeEnum.TEXT_LINE + if field_types and key in field_types: + ftype = field_types[key] + if callable(_ensure_fn): + _ensure_fn(key, name=None, field_type=ftype) + else: + try: + library.get_value_type(key) + except Exception: + _create_fn = ( + getattr(library, "create_value_type", None) + or getattr(library, "add_value_type", None) + ) + if callable(_create_fn): + _create_fn(key, name=None, field_type=ftype) + else: + library.get_value_type(key) + else: + library.get_value_type(key) + + existing_fields = [f for f in entry.fields if f.type_key == key] + existing_values = [(f.value or "") for f in existing_fields] + # De-duplicate incoming values while preserving order + seen: set[str] = set() + dedup_values: list[str] = [] + for v in values: + if v not in seen: + dedup_values.append(v) + seen.add(v) + values = dedup_values + + if overwrite: + # Overwrite existing in order, then append any remaining values + for i, val in enumerate(values): + if i < len(existing_fields): + # Only write if changing the value + if (existing_values[i] if i < len(existing_values) else "") != val: + library.update_entry_field(entry.id, existing_fields[i], val) + applied += 1 + else: + # Skip appending if exact duplicate already exists + if val in existing_values: + continue + if library.add_field_to_entry(entry.id, field_id=key, value=val): + applied += 1 + continue + + if not allow_existing and any(val != "" for val in existing_values): + continue + + # Fill empty slots first without disturbing existing populated values + remaining: list[str] = [] + seen_existing = set(existing_values) + for val in values: + if val in seen_existing: + continue + if val not in remaining: + remaining.append(val) + + for f in existing_fields: + if not remaining: + break + current = f.value or "" + if current != "": + continue + next_val = remaining.pop(0) + if current != next_val: + library.update_entry_field(entry.id, f, next_val) + applied += 1 + existing_values.append(next_val) + seen_existing.add(next_val) + + for val in remaining: + if val in seen_existing: + continue + if library.add_field_to_entry(entry.id, field_id=key, value=val): + applied += 1 + seen_existing.add(val) + existing_values.append(val) + + return applied + + +# ================= UI: Paths → Fields Modal ================ + + +class _MappingRow(QWidget): + def __init__(self, parent: QWidget | None = None) -> None: + super().__init__(parent) + layout = QHBoxLayout(self) + layout.setContentsMargins(0, 0, 0, 0) + # Field selector: choose from built-in FieldID + self.field_select = QComboBox() + for fid in FieldID: + self.field_select.addItem(fid.value.name, fid.name) + # Single-line editor + self.val_edit_line = QLineEdit() + self.val_edit_line.setPlaceholderText(Translations["paths_to_fields.template_placeholder"]) + # Multi-line editor (for TEXT_BOX fields) + self.val_edit_box = QPlainTextEdit() + self.val_edit_box.setPlaceholderText(Translations["paths_to_fields.template_placeholder"]) + self.val_edit_box.setFixedHeight(64) + self.remove_btn = QPushButton("-") + self.remove_btn.setFixedWidth(28) + layout.addWidget(self.field_select) + layout.addWidget(self.val_edit_line) + layout.addWidget(self.val_edit_box) + layout.addWidget(self.remove_btn) + + # Start with proper editor based on current selection + self._update_editor_kind() + self.field_select.currentIndexChanged.connect(self._update_editor_kind) + + + def as_pair(self) -> tuple[str, str] | None: + editor = self._current_value_editor() + v = ( + editor.toPlainText().strip() + if isinstance(editor, QPlainTextEdit) + else editor.text().strip() + ) + if not v: + return None + fid_name = self.field_select.currentData() + return (str(fid_name), v) + + def _current_value_editor(self) -> QLineEdit | QPlainTextEdit: + # TEXT_BOX => multi-line, else single-line + try: + fid_name = self.field_select.currentData() + ftype = ( + FieldID[fid_name].value.type + if fid_name in FieldID.__members__ + else FieldTypeEnum.TEXT_LINE + ) + except Exception: + ftype = FieldTypeEnum.TEXT_LINE + return self.val_edit_box if ftype == FieldTypeEnum.TEXT_BOX else self.val_edit_line + + def _update_editor_kind(self) -> None: + editor = self._current_value_editor() + use_box = isinstance(editor, QPlainTextEdit) + self.val_edit_box.setVisible(use_box) + self.val_edit_line.setVisible(not use_box) + + + +class PathsToFieldsModal(QWidget): + def __init__(self, library: Library, driver: QtDriver) -> None: + super().__init__() + self.library = library + self.driver = driver + self.setWindowTitle(Translations["paths_to_fields.title"]) # fallback shows [key] + self.setWindowModality(Qt.WindowModality.ApplicationModal) + self.setMinimumSize(720, 640) + + self._preview_results: list[EntryFieldUpdate] = [] + self._preview_running = False + self._apply_running = False + self._cancel_preview = False + self._preview_iterator: FunctionIterator | None = None + self._preview_runnable: CustomRunnable | None = None + self._apply_iterator: FunctionIterator | None = None + self._apply_runnable: CustomRunnable | None = None + self._progress_prefix = "" + self._progress_cancel_handler: Callable[[], None] | None = None + + root = QVBoxLayout(self) + root.setContentsMargins(8, 8, 8, 8) + + title = QLabel(Translations["paths_to_fields.title"]) # may show [paths_to_fields.title] + title.setAlignment(Qt.AlignmentFlag.AlignCenter) + title.setStyleSheet("font-weight:600;font-size:14px;padding:6px 0") + desc = QLabel( + Translations[ + "paths_to_fields.description" + ] + ) + desc.setWordWrap(True) + desc.setAlignment(Qt.AlignmentFlag.AlignCenter) + + # Pattern and options (use a FormLayout to tie label to input) + form = QWidget() + form_layout = QFormLayout(form) + form_layout.setContentsMargins(0, 0, 0, 0) + form_layout.setFormAlignment(Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignTop) + form_layout.setLabelAlignment(Qt.AlignmentFlag.AlignLeft) + form_layout.setFieldGrowthPolicy(QFormLayout.FieldGrowthPolicy.ExpandingFieldsGrow) + + pattern_label = QLabel(Translations["paths_to_fields.pattern_label"]) + self.pattern_edit = QPlainTextEdit() + self.pattern_edit.setPlaceholderText(r"^(?P<folder>[^/]+)/(?P<stem>[^_]+)_(?P<page>\d+)\.[^.]+$") + self.pattern_edit.setFixedHeight(80) + self.pattern_edit.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed) + pattern_label.setBuddy(self.pattern_edit) + + self.filename_only_cb = QCheckBox(Translations["paths_to_fields.use_filename_only"]) + self.allow_existing_cb = QCheckBox(Translations["paths_to_fields.allow_existing"]) + + form_layout.addRow(pattern_label, self.pattern_edit) + form_layout.addRow(self.filename_only_cb) + form_layout.addRow(self.allow_existing_cb) + + # Ensure the form block doesn't vertically stretch on resize + form.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Fixed) + + # Mappings section + map_label = QLabel(Translations["paths_to_fields.mappings_label"]) + map_container = QWidget() + self.map_v = QVBoxLayout(map_container) + self.map_v.setContentsMargins(0, 0, 0, 0) + self.map_v.setSpacing(6) + # Keep mappings area height fixed to its contents + map_container.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Fixed) + + self.add_map_btn = QPushButton(Translations["paths_to_fields.add_mapping"]) + self.add_map_btn.clicked.connect(self._add_mapping_row) + + # Preview area + self.preview_btn = QPushButton(Translations["paths_to_fields.preview"]) + self.preview_btn.clicked.connect(self._on_preview) + self.preview_area = QPlainTextEdit() + self.preview_area.setReadOnly(True) + self.preview_area.setFrameShape(QFrame.Shape.StyledPanel) + self.preview_area.setPlaceholderText(Translations["paths_to_fields.preview_empty"]) + self.preview_area.setMinimumHeight(200) + self.preview_area.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) + self.preview_area.setWordWrapMode(QTextOption.WrapMode.WrapAtWordBoundaryOrAnywhere) + + self.progress_container = QWidget() + self.progress_container.setVisible(False) + self.progress_container.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed) + progress_layout = QVBoxLayout(self.progress_container) + progress_layout.setContentsMargins(0, 0, 0, 0) + progress_layout.setSpacing(4) + + self.progress_label = QLabel() + self.progress_label.setWordWrap(True) + self.progress_label.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed) + + progress_bar_row = QHBoxLayout() + progress_bar_row.setContentsMargins(0, 0, 0, 0) + progress_bar_row.setSpacing(6) + + self.progress_bar = QProgressBar() + self.progress_bar.setMinimumWidth(240) + self.progress_bar.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed) + self.progress_bar.setTextVisible(False) + + self.progress_cancel_btn = QPushButton(Translations["generic.cancel"]) + self.progress_cancel_btn.setVisible(False) + self.progress_cancel_btn.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed) + self.progress_cancel_btn.clicked.connect(self._handle_progress_cancel) + + progress_bar_row.addWidget(self.progress_bar) + progress_bar_row.addWidget(self.progress_cancel_btn) + + progress_layout.addWidget(self.progress_label) + progress_layout.addLayout(progress_bar_row) + + # Apply + self.apply_btn = QPushButton(Translations["generic.apply_alt"]) # existing key + self.apply_btn.setMinimumWidth(100) + self.apply_btn.clicked.connect(self._on_apply) + + # Ensure pressing Enter in editors doesn't trigger any default button + # Explicitly disable default behaviors on buttons + for b in (self.preview_btn, self.apply_btn): + try: + b.setAutoDefault(False) + b.setDefault(False) + except Exception: + pass + + # Layout assembly + root.addWidget(title) + root.addWidget(desc) + root.addWidget(form) + root.addWidget(map_label) + root.addWidget(map_container) + root.addWidget(self.add_map_btn, alignment=Qt.AlignmentFlag.AlignLeft) + root.addWidget(self.preview_btn, alignment=Qt.AlignmentFlag.AlignLeft) + root.addWidget(self.progress_container) + root.addWidget(self.preview_area) + root.addWidget(self.apply_btn, alignment=Qt.AlignmentFlag.AlignCenter) + + # Make only the preview area consume extra vertical space on resize + root.setStretchFactor(self.preview_area, 1) + + # Seed one mapping row + self._add_mapping_row() + + def _add_mapping_row(self): + row = _MappingRow() + row.remove_btn.clicked.connect(lambda: self._remove_row(row)) + self.map_v.addWidget(row) + + def _remove_row(self, row: _MappingRow): + row.setParent(None) + + def _collect_rules(self) -> tuple[list[PathFieldRule], dict[str, FieldTypeEnum]] | None: + pattern = self.pattern_edit.toPlainText().strip() + if not pattern: + msg_box = QMessageBox() + msg_box.setIcon(QMessageBox.Icon.Warning) + msg_box.setWindowTitle(Translations["window.title.error"]) # reuse common title + msg_box.setText(Translations["paths_to_fields.msg.enter_pattern"]) + msg_box.addButton(Translations["generic.close"], QMessageBox.ButtonRole.AcceptRole) + msg_box.exec_() + return None + fields_list: list[tuple[str, str]] = [] + f_types: dict[str, FieldTypeEnum] = {} + for i in range(self.map_v.count()): + w = self.map_v.itemAt(i).widget() + if isinstance(w, _MappingRow): + kv = w.as_pair() + if kv: + fields_list.append(kv) + # No custom fields support in UI; backend keeps optional field_types for tests + if not fields_list: + msg_box = QMessageBox() + msg_box.setIcon(QMessageBox.Icon.Warning) + msg_box.setWindowTitle(Translations["window.title.error"]) # reuse common title + msg_box.setText(Translations["paths_to_fields.msg.add_mapping"]) + msg_box.addButton(Translations["generic.close"], QMessageBox.ButtonRole.AcceptRole) + msg_box.exec_() + return None + try: + re.compile(pattern) + except re.error as e: + msg_box = QMessageBox() + msg_box.setIcon(QMessageBox.Icon.Critical) + msg_box.setWindowTitle(Translations["paths_to_fields.msg.invalid_regex_title"]) + msg_box.setText(Translations["paths_to_fields.msg.invalid_regex_title"]) + msg_box.setInformativeText(str(e)) + msg_box.addButton(Translations["generic.close"], QMessageBox.ButtonRole.AcceptRole) + msg_box.exec_() + return None + rule = PathFieldRule( + pattern=pattern, + fields=fields_list, + use_filename_only=self.filename_only_cb.isChecked(), + ) + return [rule], f_types + + def _on_preview(self): + if self._preview_running or self._apply_running: + return + r = self._collect_rules() + if not r: + return + rules, _ = r + self.preview_area.clear() + self._preview_results = [] + + try: + total = self.library.entry_count() + except Exception: + total = None + + self._cancel_preview = False + self._preview_running = True + self._set_controls_enabled(enabled=False) + + self._start_progress( + label=Translations["paths_to_fields.preview"], + total=total, + cancel_handler=self._request_preview_cancel, + ) + + def generator(): + return iter_preview_paths_to_fields( + self.library, + rules, + only_unset=False, + cancel_callback=lambda: self._cancel_preview, + ) + + iterator = FunctionIterator(generator) + iterator.value.connect(self._handle_preview_progress) + + runnable = CustomRunnable(iterator.run) + runnable.done.connect(self._finalize_preview) + + self._preview_iterator = iterator + self._preview_runnable = runnable + QThreadPool.globalInstance().start(runnable) + + def _on_apply(self): + if self._preview_running or self._apply_running: + return + r = self._collect_rules() + if not r: + return + rules, f_types = r + allow_existing = self.allow_existing_cb.isChecked() + previews = preview_paths_to_fields( + self.library, + rules, + only_unset=not allow_existing, + ) + if not previews: + msg_box = QMessageBox() + msg_box.setIcon(QMessageBox.Icon.Information) + msg_box.setWindowTitle(Translations["paths_to_fields.title"]) # use modal title + msg_box.setText(Translations["paths_to_fields.msg.no_matches"]) + msg_box.addButton(Translations["generic.close"], QMessageBox.ButtonRole.AcceptRole) + msg_box.exec_() + return + + total = len(previews) + self._apply_running = True + self._set_controls_enabled(enabled=False) + self._start_progress( + label=Translations["paths_to_fields.progress.label.initial"], + total=total, + cancel_handler=None, + ) + + def generator(): + return self._iter_apply_updates(previews, f_types, allow_existing) + + iterator = FunctionIterator(generator) + iterator.value.connect(self._handle_apply_progress) + + runnable = CustomRunnable(iterator.run) + runnable.done.connect(self._finalize_apply) + + self._apply_iterator = iterator + self._apply_runnable = runnable + QThreadPool.globalInstance().start(runnable) + + def _iter_apply_updates( + self, + previews: list[EntryFieldUpdate], + field_types: dict[str, FieldTypeEnum], + allow_existing: bool, + ) -> Iterator[PreviewProgress]: + try: + from tagstudio.core.library.alchemy import library as _libmod # local import + except Exception: + _libmod = None + + class _NoInfoLogger: + def __init__(self, base): + self._base = base + + def info(self, *_, **__): # suppress info noise during bulk apply + return None + + def debug(self, *_, **__): + return None + + def warning(self, *args, **kwargs): + return self._base.warning(*args, **kwargs) + + def error(self, *args, **kwargs): + return self._base.error(*args, **kwargs) + + def exception(self, *args, **kwargs): + return self._base.exception(*args, **kwargs) + + def __getattr__(self, name): + return getattr(self._base, name) + + _saved_logger = None + if _libmod is not None and hasattr(_libmod, "logger"): + _saved_logger = _libmod.logger + _libmod.logger = _NoInfoLogger(_saved_logger) + + total = len(previews) + try: + for index, upd in enumerate(previews, start=1): + apply_paths_to_fields( + self.library, + [upd], + create_missing_field_types=True, + field_types=field_types, + allow_existing=allow_existing, + ) + yield PreviewProgress(index=index, total=total, path=upd.path, update=upd) + finally: + if _saved_logger is not None and _libmod is not None: + _libmod.logger = _saved_logger + + def _append_preview_update(self, upd: EntryFieldUpdate) -> None: + lines = [upd.path] + entry = unwrap(self.library.get_entry_full(upd.entry_id)) + for key, value in upd.updates: + existing_vals = [f.value or "" for f in entry.fields if f.type_key == key] + allow_existing = self.allow_existing_cb.isChecked() + # Flag duplicates before generic already_set so we only warn for actual conflicts + if value in existing_vals and value != "": + marker = Translations["paths_to_fields.preview.markers.duplicate"] + else: + already_set = any(val != "" for val in existing_vals) + marker = ( + Translations["paths_to_fields.preview.markers.already_set"] + if already_set and not allow_existing + else None + ) + prefix = f"⚠ {marker} — " if marker else "" + lines.append(f" - {prefix}{key}: {value}") + self.preview_area.appendPlainText("\n".join(lines)) + self.preview_area.ensureCursorVisible() + + def _handle_preview_progress(self, progress: PreviewProgress) -> None: + self._update_progress(progress) + if progress.update: + self._preview_results.append(progress.update) + self._append_preview_update(progress.update) + + def _handle_apply_progress(self, progress: PreviewProgress) -> None: + self._update_progress(progress) + + def _update_progress(self, progress: PreviewProgress) -> None: + total = progress.total or 0 + if total > 0: + self.progress_bar.setRange(0, total) + self.progress_bar.setValue(min(progress.index, total)) + else: + self.progress_bar.setRange(0, 0) + + lines: list[str] = [] + if self._progress_prefix: + lines.append(self._progress_prefix) + if progress.total: + lines.append(f"{progress.index}/{progress.total}") + else: + lines.append(str(progress.index)) + if progress.path: + lines.append(progress.path) + self.progress_label.setText("\n".join(filter(None, lines))) + + def _start_progress( + self, + *, + label: str, + total: int | None, + cancel_handler: Callable[[], None] | None, + ) -> None: + self._progress_prefix = label + self.progress_label.setText(label) + self.progress_container.setVisible(True) + if total and total > 0: + self.progress_bar.setRange(0, total) + self.progress_bar.setValue(0) + else: + self.progress_bar.setRange(0, 0) + self._set_cancel_handler(cancel_handler) + + def _finish_progress(self) -> None: + self.progress_container.setVisible(False) + self.progress_label.clear() + self.progress_bar.setValue(0) + self._progress_prefix = "" + self._set_cancel_handler(None) + + def _set_cancel_handler(self, handler: Callable[[], None] | None) -> None: + self._progress_cancel_handler = handler + has_handler = handler is not None + self.progress_cancel_btn.setVisible(has_handler) + self.progress_cancel_btn.setEnabled(has_handler) + + def _handle_progress_cancel(self) -> None: + if self._progress_cancel_handler: + self.progress_cancel_btn.setEnabled(False) + self._progress_cancel_handler() + + def _request_preview_cancel(self) -> None: + self._cancel_preview = True + + def _finalize_preview(self) -> None: + cancelled = self._cancel_preview + self._preview_running = False + self._cancel_preview = False + self._preview_iterator = None + self._preview_runnable = None + self._finish_progress() + self._set_controls_enabled(enabled=True) + if not self._preview_results and not cancelled: + self.preview_area.setPlainText(Translations["paths_to_fields.msg.no_matches"]) + + def _finalize_apply(self) -> None: + self._apply_running = False + self._apply_iterator = None + self._apply_runnable = None + self._finish_progress() + self._set_controls_enabled(enabled=True) + self.close() + with suppress(Exception): + self.driver.main_window.preview_panel.set_selection( + self.driver.selected, + update_preview=False, + ) + + def _set_controls_enabled(self, *, enabled: bool) -> None: + self.preview_btn.setEnabled(enabled) + self.apply_btn.setEnabled(enabled) + self.add_map_btn.setEnabled(enabled) + self.pattern_edit.setEnabled(enabled) + self.filename_only_cb.setEnabled(enabled) + self.allow_existing_cb.setEnabled(enabled) + for i in range(self.map_v.count()): + widget = self.map_v.itemAt(i).widget() + if isinstance(widget, _MappingRow): + widget.setEnabled(enabled) diff --git a/src/tagstudio/qt/ts_qt.py b/src/tagstudio/qt/ts_qt.py index 8d7edde30..f720191d8 100644 --- a/src/tagstudio/qt/ts_qt.py +++ b/src/tagstudio/qt/ts_qt.py @@ -84,6 +84,7 @@ from tagstudio.qt.mixed.folders_to_tags import FoldersToTagsModal from tagstudio.qt.mixed.item_thumb import BadgeType from tagstudio.qt.mixed.migration_modal import JsonMigrationModal +from tagstudio.qt.mixed.paths_to_fields import PathsToFieldsModal from tagstudio.qt.mixed.progress_bar import ProgressWidget from tagstudio.qt.mixed.settings_panel import SettingsPanel from tagstudio.qt.mixed.tag_color_manager import TagColorManager @@ -543,6 +544,15 @@ def create_folders_tags_modal(): create_folders_tags_modal ) + def create_paths_fields_modal(): + if not hasattr(self, "paths_fields_modal"): + self.paths_fields_modal = PathsToFieldsModal(self.lib, self) + self.paths_fields_modal.show() + + self.main_window.menu_bar.paths_to_fields_action.triggered.connect( + create_paths_fields_modal + ) + # endregion # region Help Menu ============================================================ @@ -769,6 +779,7 @@ def close_library(self, is_shutdown: bool = False): self.main_window.menu_bar.fix_dupe_files_action.setEnabled(False) self.main_window.menu_bar.clear_thumb_cache_action.setEnabled(False) self.main_window.menu_bar.folders_to_tags_action.setEnabled(False) + self.main_window.menu_bar.paths_to_fields_action.setEnabled(False) self.main_window.menu_bar.library_info_action.setEnabled(False) except AttributeError: logger.warning( @@ -1622,6 +1633,7 @@ def _init_library(self, path: Path, open_status: LibraryStatus): self.main_window.menu_bar.fix_dupe_files_action.setEnabled(True) self.main_window.menu_bar.clear_thumb_cache_action.setEnabled(True) self.main_window.menu_bar.folders_to_tags_action.setEnabled(True) + self.main_window.menu_bar.paths_to_fields_action.setEnabled(True) self.main_window.menu_bar.library_info_action.setEnabled(True) self.main_window.preview_panel.set_selection(self.selected) diff --git a/src/tagstudio/qt/views/main_window.py b/src/tagstudio/qt/views/main_window.py index df675fbe6..14c5f84b2 100644 --- a/src/tagstudio/qt/views/main_window.py +++ b/src/tagstudio/qt/views/main_window.py @@ -385,6 +385,11 @@ def setup_macros_menu(self): self.folders_to_tags_action.setEnabled(False) self.macros_menu.addAction(self.folders_to_tags_action) + # Paths → Fields + self.paths_to_fields_action = QAction(Translations["menu.macros.paths_to_fields"], self) + self.paths_to_fields_action.setEnabled(False) + self.macros_menu.addAction(self.paths_to_fields_action) + assign_mnemonics(self.macros_menu) self.addMenu(self.macros_menu) diff --git a/src/tagstudio/resources/translations/en.json b/src/tagstudio/resources/translations/en.json index edda02311..d6051f576 100644 --- a/src/tagstudio/resources/translations/en.json +++ b/src/tagstudio/resources/translations/en.json @@ -229,6 +229,7 @@ "menu.help.about": "About", "menu.help": "&Help", "menu.macros.folders_to_tags": "Folders to Tags", + "menu.macros.paths_to_fields": "Paths to Fields", "menu.macros": "&Macros", "menu.select": "Select", "menu.settings": "Settings...", @@ -246,6 +247,27 @@ "namespace.create.title": "Create Namespace", "namespace.new.button": "New Namespace", "namespace.new.prompt": "Create a New Namespace to Start Adding Custom Colors!", + "paths_to_fields.add_mapping": "Add Mapping", + "paths_to_fields.converting": "Converting paths to Fields", + "paths_to_fields.description": "Creates fields based on your file paths and applies them to your entries.\n The structure below shows all the fields that will be created and what entries they will be applied to.", + "paths_to_fields.field_key_placeholder": "field_key e.g. page_number", + "paths_to_fields.pattern_label": "File Path Pattern", + "paths_to_fields.preview": "Preview", + "paths_to_fields.preview_empty": "No Preview Available", + "paths_to_fields.preview.markers.apply": "apply", + "paths_to_fields.preview.markers.already_set": "skipped (already set)", + "paths_to_fields.preview.markers.duplicate": "skipped (duplicate)", + "paths_to_fields.progress.window_title": "Apply Fields", + "paths_to_fields.progress.label.initial": "Applying Field Updates...", + "paths_to_fields.mappings_label": "Field Mappings", + "paths_to_fields.msg.enter_pattern": "Please enter a regex pattern.", + "paths_to_fields.msg.add_mapping": "Please add at least one field mapping.", + "paths_to_fields.msg.invalid_regex_title": "Invalid Regex", + "paths_to_fields.msg.no_matches": "No matches found.", + "paths_to_fields.template_placeholder": "template e.g. $page or example.com/$id", + "paths_to_fields.title": "Create Fields From Paths", + "paths_to_fields.use_filename_only": "Use Filename Only", + "paths_to_fields.allow_existing": "Apply even when fields already have values", "preview.ignored": "Ignored", "preview.multiple_selection": "<b>{count}</b> Items Selected", "preview.no_selection": "No Items Selected", diff --git a/tests/macros/test_paths_to_fields.py b/tests/macros/test_paths_to_fields.py new file mode 100644 index 000000000..7eb0c8d1e --- /dev/null +++ b/tests/macros/test_paths_to_fields.py @@ -0,0 +1,211 @@ +# Copyright (C) 2025 +# Licensed under the GPL-3.0 License. +# Created for TagStudio: https://github.com/CyanVoxel/TagStudio + +from pathlib import Path + +from tagstudio.core.library.alchemy.fields import FieldID +from tagstudio.core.library.alchemy.library import Library +from tagstudio.core.library.alchemy.models import Entry +from tagstudio.core.utils.types import unwrap +from tagstudio.qt.mixed.paths_to_fields import ( + PathFieldRule, + apply_paths_to_fields, + iter_preview_paths_to_fields, + preview_paths_to_fields, +) + + +def test_paths_to_fields_preview_and_apply(library: Library): + folder = unwrap(library.folder) + + entries = [ + Entry(folder=folder, path=Path("series-MySeries/01_10.jpg"), fields=[]), + Entry(folder=folder, path=Path("creator-jdoe/abc123_02.png"), fields=[]), + Entry( + folder=folder, + path=Path("creator-jane/Some-Series_source-name_003.jpeg"), + fields=[], + ), + ] + ids = library.add_entries(entries) + + rules = [ + # series-{series}/{page}_{total}.ext + PathFieldRule( + pattern=r"^series-(?P<series>[^/]+)/(?P<page>\d+)_\d+\.[^.]+$", + fields={ + FieldID.SERIES.name: "$series", + "page_number": "$page", + }, + ), + # creator-{artist}/{source_ident}_{page}.ext -> artist + source URL + PathFieldRule( + pattern=r"^creator-(?P<artist>[^/]+)/(?P<source_ident>[^_]+)_(?P<page>\d+)\.[^.]+$", + fields={ + FieldID.ARTIST.name: "$artist", + FieldID.SOURCE.name: "example.com/abc/$source_ident", + }, + ), + # creator-{artist}/{series}_{source}_{page}.ext + PathFieldRule( + pattern=r"^creator-(?P<artist>[^/]+)/(?P<series>[^_]+)_(?P<source>[^_]+)_(?P<page>\d+)\.[^.]+$", + fields={ + FieldID.ARTIST.name: "$artist", + FieldID.SERIES.name: "$series", + FieldID.SOURCE.name: "$source", + "page_number": "$page", + }, + ), + ] + + preview = preview_paths_to_fields(library, rules) + # should propose updates for all 3 entries + assert len(preview) == 3 + + applied = apply_paths_to_fields(library, preview, create_missing_field_types=True) + # ** TODO: The test only verifies that 'applied >= 5' but doesn't + # verify the exact number or check for potential duplicate field assignments. + assert applied >= 5 # at least series + page + artist + source for 2 rules + + # Validate the fields were set as expected + e0 = unwrap(library.get_entry_full(ids[0])) + kv0 = {f.type_key: (f.value or "") for f in e0.fields} + assert kv0.get(FieldID.SERIES.name) == "MySeries" + assert kv0.get("page_number") == "01" + + e1 = unwrap(library.get_entry_full(ids[1])) + kv1 = {f.type_key: (f.value or "") for f in e1.fields} + assert kv1.get(FieldID.ARTIST.name) == "jdoe" + assert kv1.get(FieldID.SOURCE.name) == "example.com/abc/abc123" + + e2 = unwrap(library.get_entry_full(ids[2])) + kv2 = {f.type_key: (f.value or "") for f in e2.fields} + assert kv2.get(FieldID.ARTIST.name) == "jane" + assert kv2.get(FieldID.SERIES.name) == "Some-Series" + assert kv2.get(FieldID.SOURCE.name) == "source-name" + assert kv2.get("page_number") == "003" + + +def test_paths_to_fields_allows_duplicate_fields(library: Library): + folder = unwrap(library.folder) + + entry = Entry(folder=folder, path=Path("multi-foo_bar.jpg"), fields=[]) + [eid] = library.add_entries([entry]) + + rule = PathFieldRule( + pattern=r"^multi-(?P<a>[^_]+)_(?P<b>[^.]+)\.[^.]+$", + fields=[ + (FieldID.COMMENTS.name, "$a"), + (FieldID.COMMENTS.name, "$b"), + ], + ) + + preview = preview_paths_to_fields(library, [rule]) + assert len(preview) == 1 + # Should propose two updates for the same key, in order + assert preview[0].updates == [ + (FieldID.COMMENTS.name, "foo"), + (FieldID.COMMENTS.name, "bar"), + ] + + applied = apply_paths_to_fields(library, preview, create_missing_field_types=True) + assert applied == 2 + + e = unwrap(library.get_entry_full(eid)) + comment_values = [f.value or "" for f in e.fields if f.type_key == FieldID.COMMENTS.name] + assert sorted(comment_values) == ["bar", "foo"] + + +def test_apply_paths_to_fields_allow_existing_appends(library: Library): + folder = unwrap(library.folder) + + entry = Entry(folder=folder, path=Path("existing/NewSeries-extra.jpg"), fields=[]) + [eid] = library.add_entries([entry]) + assert library.add_field_to_entry(eid, field_id=FieldID.SERIES.name, value="Existing Series") + + rule = PathFieldRule( + pattern=r"^existing/(?P<series>[^-]+)-(?P<suffix>[^.]+)\.[^.]+$", + fields=[(FieldID.SERIES.name, "$series")], + ) + + preview = preview_paths_to_fields(library, [rule], only_unset=False) + assert preview, "Expected preview to include updates even when field already set" + + applied_without = apply_paths_to_fields( + library, + preview, + create_missing_field_types=True, + allow_existing=False, + ) + assert applied_without == 0 + entry_state = unwrap(library.get_entry_full(eid)) + values = [f.value or "" for f in entry_state.fields if f.type_key == FieldID.SERIES.name] + assert values == ["Existing Series"] + + preview = preview_paths_to_fields(library, [rule], only_unset=False) + applied_with = apply_paths_to_fields( + library, + preview, + create_missing_field_types=True, + allow_existing=True, + ) + assert applied_with == 1 + entry_state = unwrap(library.get_entry_full(eid)) + values = [f.value or "" for f in entry_state.fields if f.type_key == FieldID.SERIES.name] + assert sorted(values) == ["Existing Series", "NewSeries"] + + +def test_iter_preview_paths_to_fields_reports_progress(library: Library): + folder = unwrap(library.folder) + + entries = [ + Entry(folder=folder, path=Path("progress/alpha_01.jpg"), fields=[]), + Entry(folder=folder, path=Path("progress/beta_02.jpg"), fields=[]), + ] + library.add_entries(entries) + + rule = PathFieldRule( + pattern=r"^progress/(?P<name>[^_]+)_(?P<page>\d+)\.[^.]+$", + fields=[ + (FieldID.SERIES.name, "$name"), + ("page_number", "$page"), + ], + ) + + events = list(iter_preview_paths_to_fields(library, [rule], only_unset=False)) + assert events, "Expected progress events to be emitted" + + totals = {evt.total for evt in events if evt.total is not None} + assert totals == {library.entry_count()} + assert any(evt.update is not None for evt in events) + + +def test_iter_preview_paths_to_fields_stop(library: Library): + folder = unwrap(library.folder) + + entries = [ + Entry(folder=folder, path=Path("stop/foo_01.jpg"), fields=[]), + Entry(folder=folder, path=Path("stop/bar_02.jpg"), fields=[]), + ] + library.add_entries(entries) + + rule = PathFieldRule( + pattern=r"^stop/(?P<stem>[^_]+)_(?P<page>\d+)\.[^.]+$", + fields=[(FieldID.SERIES.name, "$stem")], + ) + + seen: list[int] = [] + + def should_cancel() -> bool: + return len(seen) >= 1 + + for evt in iter_preview_paths_to_fields( + library, + [rule], + only_unset=False, + cancel_callback=should_cancel, + ): + seen.append(evt.index) + + assert seen == [1]