-
Notifications
You must be signed in to change notification settings - Fork 4
/
move_core.py
350 lines (253 loc) · 10.8 KB
/
move_core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
"""Core api for moving items."""
import logging
import re
import shutil
from contextlib import suppress
from pathlib import Path
from typing import Callable, Optional, Union
import dynaconf
import pluggy
from unidecode import unidecode
import moe
from moe import config
from moe.library import Album, Extra, LibItem, Track
__all__ = ["copy_item", "fmt_item_path", "move_item"]
log = logging.getLogger("moe.move")
class Hooks:
"""Move plugin hook specifications."""
@staticmethod
@moe.hookspec
def create_path_template_func() -> list[Callable]: # type: ignore
"""Create a custom path template function.
Any functions returned by this hook will be available to be called from within
the path templates defined by the move plugin.
Returns:
A list of all custom path functions your plugin creates. The list should
contain the callable functions themselves.
""" # noqa: DAR202
@moe.hookimpl
def add_hooks(pm: pluggy.manager.PluginManager):
"""Registers `add` hookspecs to Moe."""
from moe.move.move_core import Hooks
pm.add_hookspecs(Hooks)
@moe.hookimpl
def add_config_validator(settings: dynaconf.base.LazySettings):
"""Validate move plugin configuration settings."""
default_album_path = "{album.artist}/{album.title} ({album.year})"
default_extra_path = "{e_unique(extra)}"
default_track_path = (
"{f'Disc {track.disc:02}' if album.disc_total > 1 else ''}/"
"{track.track_num:02} - {track.title}{track.path.suffix}"
)
validators = [
dynaconf.Validator("MOVE.ASCIIFY_PATHS", default=False),
dynaconf.Validator("MOVE.ALBUM_PATH", default=default_album_path),
dynaconf.Validator("MOVE.EXTRA_PATH", default=default_extra_path),
dynaconf.Validator("MOVE.TRACK_PATH", default=default_track_path),
]
settings.validators.register(*validators)
@moe.hookimpl(trylast=True)
def edit_new_items(items: list[LibItem]):
"""Copies and formats the path of an item after it has been added to the library."""
for item in items:
copy_item(item)
@moe.hookimpl
def create_path_template_func() -> list[Callable]:
"""Adds custom functions for the path templates."""
return [e_unique]
def e_unique(extra: Extra) -> str:
"""Returns a unique filename for an extra within its album."""
extra_names = [album_extra.path.name for album_extra in extra.album.extras]
if (name_count := extra_names.count(extra.path.name)) > 1:
return extra.path.stem + f" ({name_count - 1})" + extra.path.suffix
return extra.path.name
########################################################################################
# Format paths
########################################################################################
def fmt_item_path(item: LibItem, lib_path: Optional[Path] = None) -> Path:
"""Returns a formatted item path according to the user configuration.
Args:
item: Item whose path will be formatted.
lib_path: Optional library path the outputted path will be relative to. By
default, this will be the ``library_path`` setting in the config.
Returns:
A formatted path as defined by the ``{album/extra/track}_path`` config template
settings relative to ``lib_path``.
"""
log.debug(f"Formatting item path. [path={item.path}]")
lib_path = lib_path or Path(config.CONFIG.settings.library_path).expanduser()
if isinstance(item, Album):
new_path = _fmt_album_path(item, lib_path)
elif isinstance(item, Extra):
new_path = _fmt_extra_path(item, lib_path)
else:
assert isinstance(item, Track)
new_path = _fmt_track_path(item, lib_path)
if config.CONFIG.settings.move.asciify_paths:
new_path = Path(unidecode(str(new_path)))
log.debug(f"Formatted item path. [path={new_path}]")
return new_path
def _fmt_album_path(album: Album, lib_path: Path) -> Path:
"""Returns a formatted album directory according to the user configuration."""
album_path = _eval_path_template(config.CONFIG.settings.move.album_path, album)
return lib_path / album_path
def _fmt_extra_path(extra: Extra, lib_path: Path) -> Path:
"""Returns a formatted extra path according to the user configuration."""
album_path = _fmt_album_path(extra.album, lib_path)
extra_path = _eval_path_template(config.CONFIG.settings.move.extra_path, extra)
return album_path / extra_path
def _fmt_track_path(track: Track, lib_path: Path) -> Path:
"""Returns a formatted track path according to the user configuration."""
album_path = _fmt_album_path(track.album, lib_path)
track_path = _eval_path_template(config.CONFIG.settings.move.track_path, track)
return album_path / track_path
def _eval_path_template(template, lib_item) -> str:
"""Evaluates and sanitizes a path template.
Args:
template: Path template.
See `_lazy_fstr_item()` for more info on accepted f-string templates.
lib_item: Library item associated with the template.
Returns:
Evaluated path.
"""
template_parts = template.split("/")
sanitized_parts = []
for template_part in template_parts:
path_part = _lazy_fstr_item(template_part, lib_item)
sanitized_part = _sanitize_path_part(path_part)
if sanitized_part:
sanitized_parts.append(sanitized_part)
return "/".join(sanitized_parts)
def _lazy_fstr_item(template: str, lib_item: LibItem) -> str:
"""Evalutes the given f-string template for a specific library item.
Args:
template: f-string template to evaluate.
All library items should have their own template and refer to variables as:
Album: album (e.g. {album.title}, {album.artist})
Track: track (e.g. {track.title}, {track.artist})
Extra: extra (e.g. {extra.path.name}
lib_item: Library item referenced in the template.
Example:
The default path template for an album is::
{album.artist}/{album.title} ({album.year})
Returns:
Evaluated f-string.
Raises:
NotImplementedError: You discovered a new library item!
"""
# add the appropriate library item to the scope
if isinstance(lib_item, Album):
album = lib_item # noqa: F841
elif isinstance(lib_item, Track):
track = lib_item # noqa: F841
album = track.album
elif isinstance(lib_item, Extra):
extra = lib_item # noqa: F841
album = extra.album # noqa: F841
else:
raise NotImplementedError
plugin_funcs = config.CONFIG.pm.hook.create_path_template_func()
for funcs in plugin_funcs:
for func in funcs:
globals()[func.__name__] = func
return eval(f'f"""{template}"""')
def _sanitize_path_part(path_part: str) -> str:
"""Sanitizes a part of a path to be compatible with most filesystems.
Note:
Only sub-paths of the library path will be affected.
Args:
path_part: Path part to sanitize. Must be a single 'part' of a path, i.e. no /
separators.
Returns:
Path part with all the replacements applied.
"""
PATH_REPLACE_CHARS = {
r"^\.": "_", # leading '.' (hidden files on Unix)
r'[<>:"\?\*\|\\/]': "_", # <, >, : , ", ?, *, |, \, / (Windows reserved chars)
r"\.$": "_", # trailing '.' (Windows restriction)
r"\s+$": "", # trailing whitespace (Windows restriction)
}
for regex, replacement in PATH_REPLACE_CHARS.items():
path_part = re.sub(regex, replacement, path_part)
return path_part
########################################################################################
# Copy
########################################################################################
def copy_item(item: LibItem):
"""Copies an item to a destination as determined by the user configuration.
Overwrites any existing files. Will create the destination if it does not already
exist.
"""
if isinstance(item, Album):
_copy_album(item)
elif isinstance(item, (Extra, Track)):
_copy_file_item(item)
def _copy_album(album: Album):
"""Copies an album to a destination as determined by the user configuration."""
dest = fmt_item_path(album)
log.debug(f"Copying album. [{dest=}, {album=!r}]")
dest.mkdir(parents=True, exist_ok=True)
album.path = dest
for track in album.tracks:
_copy_file_item(track)
for extra in album.extras:
_copy_file_item(extra)
log.info(f"Album copied. [{dest=}, {album=!r}]")
def _copy_file_item(item: Union[Extra, Track]):
"""Copies an extra or track to a destination as determined by the user config."""
dest = fmt_item_path(item)
if dest.exists() and dest.samefile(item.path):
item.path = dest
return
log.debug(f"Copying item. [{dest=}, {item=!r}]")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(item.path, dest)
item.path = dest
log.info(f"Copied item. [{dest=}, {item=!r}]")
########################################################################################
# Move
########################################################################################
def move_item(item: LibItem):
"""Moves an item to a destination as determined by the user configuration.
Overwrites any existing files. Will create the destination if it does not already
exist.
"""
if isinstance(item, Album):
_move_album(item)
elif isinstance(item, (Extra, Track)):
_move_file_item(item)
def _move_album(album: Album):
"""Moves an album to a given destination.
Note:
Empty leftover directories will be removed.
"""
dest = fmt_item_path(album)
old_album_dir = album.path
log.debug(f"Moving album. [{dest=}, {album=!r}]")
dest.mkdir(parents=True, exist_ok=True)
album.path = dest
for track in album.tracks:
_move_file_item(track)
for extra in album.extras:
_move_file_item(extra)
# remove any empty leftover directories
for old_child in old_album_dir.rglob("*"):
with suppress(OSError):
old_child.rmdir()
with suppress(OSError):
old_album_dir.rmdir()
for old_parent in old_album_dir.parents:
with suppress(OSError):
old_parent.rmdir()
log.info(f"Moved album. [{dest=}, {album=!r}]")
def _move_file_item(item: Union[Extra, Track]):
"""Moves an extra or track to a destination as determined by the user config."""
dest = fmt_item_path(item)
if dest.exists() and dest.samefile(item.path):
item.path = dest
return
log.debug(f"Moving item. [{dest=}, {item=!r}]")
dest.parent.mkdir(parents=True, exist_ok=True)
item.path.replace(dest)
item.path = dest
log.info(f"Moved item. [{dest=}, {item=!r}]")