-
Notifications
You must be signed in to change notification settings - Fork 4
/
import_cli.py
378 lines (301 loc) · 11.9 KB
/
import_cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
"""Import prompt."""
import functools
import logging
from typing import Optional, Union
import pluggy
from rich import box
from rich.console import Group
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
import moe
import moe.cli
from moe import config
from moe.cli import console
from moe.library import Album, MetaAlbum, MetaTrack
from moe.moe_import.import_core import CandidateAlbum
from moe.util.cli import PromptChoice, choice_prompt
from moe.util.core import get_matching_tracks
log = logging.getLogger("moe.cli.import")
__all__ = ["candidate_prompt", "import_prompt"]
class AbortImport(Exception):
"""Used to abort the import process."""
class Hooks:
"""Import plugin cli hook specifications."""
@staticmethod
@moe.hookspec
def add_candidate_prompt_choice(prompt_choices: list[PromptChoice]):
"""Add a user input choice to the candidate prompt.
``func`` will be supplied the following keyword arguments:
* ``new_album``: Album being added to the library.
* ``candidates``: Candidate albums with all the import changes.
Args:
prompt_choices: List of prompt choices. To add a prompt choice, simply
append it to this list.
Example:
.. code:: python
prompt_choices.append(
PromptChoice(
title="Abort", shortcut_key="x", func=_abort_changes
)
)
"""
@staticmethod
@moe.hookspec
def add_import_prompt_choice(prompt_choices: list[PromptChoice]):
"""Add a user input choice to the import prompt.
``func`` will be supplied the following keyword arguments:
* ``new_album``: Album being added to the library.
* ``candidate``: Candidate album with all the import changes.
Args:
prompt_choices: List of prompt choices. To add a prompt choice, simply
append it to this list.
Example:
.. code:: python
prompt_choices.append(
PromptChoice(
title="Abort", shortcut_key="x", func=_abort_changes
)
)
"""
@moe.hookimpl
def add_hooks(pm: pluggy.manager.PluginManager):
"""Registers `import` cli hookspecs to Moe."""
from moe.moe_import.import_cli import Hooks
pm.add_hookspecs(Hooks)
@moe.hookimpl
def add_candidate_prompt_choice(prompt_choices: list[PromptChoice]):
"""Adds the ``abort`` prompt choice to the user prompt."""
prompt_choices.append(
PromptChoice(title="Abort", shortcut_key="x", func=_abort_changes)
)
@moe.hookimpl
def add_import_prompt_choice(prompt_choices: list[PromptChoice]):
"""Adds the ``apply`` and ``abort`` prompt choices to the user prompt."""
prompt_choices.append(
PromptChoice(title="Apply changes", shortcut_key="a", func=_apply_changes)
)
prompt_choices.append(
PromptChoice(title="Abort", shortcut_key="x", func=_abort_changes)
)
@moe.hookimpl
def process_candidates(new_album: Album, candidates: list[CandidateAlbum]):
"""Use the import prompt to select and process the imported candidate albums."""
if candidates:
try:
candidate_prompt(new_album, candidates[:5])
except AbortImport as err:
log.debug(err)
raise SystemExit(0) from err
def candidate_prompt(new_album: Album, candidates: list[CandidateAlbum]):
"""Runs the interactive prompt for a user to select a candidate to import.
Args:
new_album: Album being added to the library.
candidates: List of candidates to choose from.
Raises:
AbortImport: Import prompt was aborted by the user.
"""
prompt_choices: list[PromptChoice] = []
for num, candidate in enumerate(candidates, start=1):
prompt_choices.append(
PromptChoice(
_fmt_candidate_info(candidate),
str(num),
functools.partial(_select_candidate, candidate_num=num - 1),
)
)
config.CONFIG.pm.hook.add_candidate_prompt_choice(prompt_choices=prompt_choices)
prompt_choice = choice_prompt(
prompt_choices, question="Which album would you like to import?"
)
prompt_choice.func(new_album, candidates)
def _fmt_candidate_info(candidate: CandidateAlbum) -> str:
"""Formats a candidates info for the candidate prompt."""
sub_header_values = []
for str_field in ["media", "country", "label", "catalog_num"]:
if value := getattr(candidate.album, str_field):
sub_header_values.append(value)
sub_header_values.extend(candidate.sub_header_info)
sub_header = " | ".join(sub_header_values)
return (
str(candidate)
+ "\n"
+ " " * (9 + len(candidate.match_value_pct))
+ sub_header
+ "\n"
+ " " * (9 + len(candidate.match_value_pct))
+ candidate.source_str
+ "\n"
)
def _select_candidate(
new_album: Album, candidates: list[CandidateAlbum], candidate_num: int
):
"""Runs the import prompt for a selected candidate."""
import_prompt(new_album, candidates[candidate_num])
def import_prompt(
new_album: Album,
candidate: CandidateAlbum,
):
"""Runs the interactive prompt for the given album changes.
Args:
new_album: Album being added to the library. Any changes will be applied to
this album.
candidate: New candidate album with all metadata changes. Will be compared
against ``old_album``.
Raises:
AbortImport: Import prompt was aborted by the user.
"""
log.debug(f"Running import prompt. [{new_album=!r}, {candidate=!r}]")
console.print(_fmt_import_updates(new_album, candidate))
prompt_choices: list[PromptChoice] = []
config.CONFIG.pm.hook.add_import_prompt_choice(prompt_choices=prompt_choices)
prompt_choice = choice_prompt(prompt_choices)
prompt_choice.func(new_album, candidate)
def _apply_changes(
new_album: Album,
candidate: CandidateAlbum,
):
"""Applies the album changes."""
log.debug("Applying changes from import prompt.")
for old_track, new_track in get_matching_tracks(new_album, candidate.album):
if not old_track and new_track:
candidate.album.tracks.remove(new_track) # missing track
elif old_track and not new_track:
new_album.tracks.remove(
new_album.get_track(old_track.track_num, old_track.disc)
) # unmatched track
elif (
old_track
and new_track
and new_album.get_track(new_track.track_num, new_track.disc) != old_track
):
# matchup track and disc numbers of matches to ensure they merge properly
old_track.track_num = new_track.track_num
old_track.disc = new_track.disc
new_album.merge(candidate.album, overwrite=True)
def _abort_changes(
new_album: Album,
candidate: CandidateAlbum,
):
"""Aborts the album changes."""
raise AbortImport("Import prompt aborted; no changes made.")
def _fmt_import_updates(new_album: Album, candidate: CandidateAlbum) -> Panel:
"""Formats import updates between `new_album` and the `candidate`."""
album_text = _fmt_album(new_album, candidate)
track_text = _fmt_tracks(new_album, candidate)
return Panel(
Group(album_text, track_text),
title="Import Updates",
border_style="light_cyan1",
)
def _fmt_album(new_album: Album, candidate: CandidateAlbum) -> Text:
"""Formats the header for the import panel."""
header_text = Text(justify="center", style="bold")
for header_field in ("title", "artist"):
field_changes = _fmt_field_changes(new_album, candidate.album, header_field)
if not field_changes:
field_changes = Text(getattr(new_album, header_field))
header_text.append_text(field_changes).append("\n")
sub_header_text = Text()
for sub_header in ("media", "country", "label", "catalog_num"):
field_changes = _fmt_field_changes(new_album, candidate.album, sub_header)
if not field_changes:
if getattr(new_album, sub_header):
field_changes = Text(getattr(new_album, sub_header))
if sub_header_text and field_changes:
sub_header_text.append(" | ")
if field_changes:
sub_header_text.append_text(field_changes)
return (
header_text.append_text(sub_header_text)
.append("\n")
.append(candidate.source_str)
.append("\n")
)
def _fmt_tracks(new_album: Album, candidate: CandidateAlbum) -> Table:
"""Formats the tracklist for the import panel."""
track_table = Table(box=box.SIMPLE)
track_table.add_column("status")
track_table.add_column("disc")
track_table.add_column("#")
track_table.add_column("Artist")
track_table.add_column("Title")
matches = get_matching_tracks(new_album, candidate.album)
matches.sort(
key=lambda match: (
getattr(match[1], "disc", 0),
getattr(match[1], "track_num", 0),
)
) # sort by new track's disc then track number
unmatched_tracks: list[MetaTrack] = []
missing_tracks: list[MetaTrack] = []
for old_track, new_track in matches:
if old_track and new_track:
track_table.add_row(
Text("matched", style="green"),
_fmt_field_changes(old_track, new_track, "disc"),
_fmt_field_changes(old_track, new_track, "track_num"),
_fmt_field_changes(old_track, new_track, "artist"),
_fmt_field_changes(old_track, new_track, "title"),
)
elif old_track and not new_track:
unmatched_tracks.append(old_track)
elif not old_track and new_track:
missing_tracks.append(new_track)
if track_table.rows:
track_table.rows[-1].end_section = True
for missing_track in sorted(missing_tracks):
track_table.add_row(
Text("missing", style="red"),
str(missing_track.disc),
str(missing_track.track_num),
missing_track.artist,
missing_track.title,
)
for unmatched_track in sorted(unmatched_tracks):
track_table.add_row(
Text("unmatched", style="red"),
str(unmatched_track.disc),
str(unmatched_track.track_num),
unmatched_track.artist,
unmatched_track.title,
)
return track_table
def _fmt_field_changes(
old_item: Union[MetaAlbum, MetaTrack],
new_item: Union[MetaAlbum, MetaTrack],
field: str,
) -> Optional[Text]:
"""Formats changes of a single field.
Args:
old_item: Old track or album to compare.
new_item: New track or album to compare.
field: Field to compare.
Returns:
A rich 'Text' based on the following cases:
1. `old_item.field` DNE and `new_item.field` exists:
`new_item.field` will be returned in green
2. `old_item.field` exists and `new_item.field` DNE:
`old_item.field` will be striketroughed and in red
3. `old_item.field` differs from `new_item.field`:
"{old_item.field} -> {new_item.field}" and the arrow is yellow
4. `old_item.field` is equal to `new_item.field`:
`old_item.field` returned
5. None if `old_item.field` and `new_item.field` DNE
"""
old_field = getattr(old_item, field)
new_field = getattr(new_item, field)
if not old_field and new_field:
return Text(str(new_field), style="green")
if old_field and not new_field:
return Text(str(old_field), style="red strike")
if old_field != new_field:
return (
Text(str(old_field))
.append(Text(" -> ", style="yellow"))
.append(Text(str(new_field)))
)
if old_field:
return Text(str(old_field))
else:
return None