25
25
import socket
26
26
import time
27
27
import traceback
28
+ from functools import cache
28
29
from string import ascii_lowercase
30
+ from typing import TYPE_CHECKING
29
31
30
32
import confuse
31
33
from discogs_client import Client , Master , Release
40
42
from beets .plugins import BeetsPlugin , MetadataSourcePlugin , get_distance
41
43
from beets .util .id_extractors import extract_release_id
42
44
45
+ if TYPE_CHECKING :
46
+ from collections .abc import Callable , Iterable
47
+
48
+ from beets .library import Item
49
+
43
50
USER_AGENT = f"beets/{ beets .__version__ } +https://beets.io/"
44
51
API_KEY = "rAzVUQYRaoFjeBjyWuWZ"
45
52
API_SECRET = "plxtUTqoCzwxZpqdPysCwGuBSmZNdZVy"
54
61
)
55
62
56
63
64
+ TRACK_INDEX_RE = re .compile (
65
+ r"""
66
+ (.*?) # medium: everything before medium_index.
67
+ (\d*?) # medium_index: a number at the end of
68
+ # `position`, except if followed by a subtrack index.
69
+ # subtrack_index: can only be matched if medium
70
+ # or medium_index have been matched, and can be
71
+ (
72
+ (?<=\w)\.[\w]+ # a dot followed by a string (A.1, 2.A)
73
+ | (?<=\d)[A-Z]+ # a string that follows a number (1A, B2a)
74
+ )?
75
+ """ ,
76
+ re .VERBOSE ,
77
+ )
78
+
79
+
57
80
class ReleaseFormat (TypedDict ):
58
81
name : str
59
82
qty : int
@@ -73,6 +96,7 @@ def __init__(self):
73
96
"separator" : ", " ,
74
97
"index_tracks" : False ,
75
98
"append_style_genre" : False ,
99
+ "search_limit" : 5 ,
76
100
}
77
101
)
78
102
self .config ["apikey" ].redact = True
@@ -156,111 +180,37 @@ def track_distance(self, item, track_info):
156
180
data_source = "Discogs" , info = track_info , config = self .config
157
181
)
158
182
159
- def candidates (self , items , artist , album , va_likely ):
160
- """Returns a list of AlbumInfo objects for discogs search results
161
- matching an album and artist (if not various).
162
- """
163
- if not album and not artist :
164
- self ._log .debug (
165
- "Skipping Discogs query. Files missing album and artist tags."
166
- )
167
- return []
168
-
169
- if va_likely :
170
- query = album
171
- else :
172
- query = f"{ artist } { album } "
173
- try :
174
- return self .get_albums (query )
175
- except DiscogsAPIError as e :
176
- self ._log .debug ("API Error: {0} (query: {1})" , e , query )
177
- if e .status_code == 401 :
178
- self .reset_auth ()
179
- return self .candidates (items , artist , album , va_likely )
180
- else :
181
- return []
182
- except CONNECTION_ERRORS :
183
- self ._log .debug ("Connection error in album search" , exc_info = True )
184
- return []
185
-
186
- def get_track_from_album_by_title (
187
- self , album_info , title , dist_threshold = 0.3
188
- ):
189
- def compare_func (track_info ):
190
- track_title = getattr (track_info , "title" , None )
191
- dist = string_dist (track_title , title )
192
- return track_title and dist < dist_threshold
193
-
194
- return self .get_track_from_album (album_info , compare_func )
195
-
196
- def get_track_from_album (self , album_info , compare_func ):
197
- """Return the first track of the release where `compare_func` returns
198
- true.
199
-
200
- :return: TrackInfo object.
201
- :rtype: beets.autotag.hooks.TrackInfo
202
- """
203
- if not album_info :
183
+ def candidates (
184
+ self , items : list [Item ], artist : str , album : str , va_likely : bool
185
+ ) -> Iterable [AlbumInfo ]:
186
+ return self .get_albums (f"{ artist } { album } " if va_likely else album )
187
+
188
+ def get_track_from_album (
189
+ self , album_info : AlbumInfo , compare : Callable [[TrackInfo ], float ]
190
+ ) -> TrackInfo | None :
191
+ """Return the best matching track of the release."""
192
+ scores_and_tracks = [(compare (t ), t ) for t in album_info .tracks ]
193
+ score , track_info = min (scores_and_tracks , key = lambda x : x [0 ])
194
+ if score > 0.3 :
204
195
return None
205
196
206
- for track_info in album_info .tracks :
207
- # check for matching position
208
- if not compare_func ( track_info ):
209
- continue
197
+ track_info [ "artist" ] = album_info .artist
198
+ track_info [ "artist_id" ] = album_info . artist_id
199
+ track_info [ "album" ] = album_info . album
200
+ return track_info
210
201
211
- # attach artist info if not provided
212
- if not track_info ["artist" ]:
213
- track_info ["artist" ] = album_info .artist
214
- track_info ["artist_id" ] = album_info .artist_id
215
- # attach album info
216
- track_info ["album" ] = album_info .album
202
+ def item_candidates (
203
+ self , item : Item , artist : str , title : str
204
+ ) -> Iterable [TrackInfo ]:
205
+ albums = self .candidates ([item ], artist , title , False )
217
206
218
- return track_info
207
+ def compare_func (track_info : TrackInfo ) -> float :
208
+ return string_dist (track_info .title , title )
219
209
220
- return None
221
-
222
- def item_candidates (self , item , artist , title ):
223
- """Returns a list of TrackInfo objects for Search API results
224
- matching ``title`` and ``artist``.
225
- :param item: Singleton item to be matched.
226
- :type item: beets.library.Item
227
- :param artist: The artist of the track to be matched.
228
- :type artist: str
229
- :param title: The title of the track to be matched.
230
- :type title: str
231
- :return: Candidate TrackInfo objects.
232
- :rtype: list[beets.autotag.hooks.TrackInfo]
233
- """
234
- if not artist and not title :
235
- self ._log .debug (
236
- "Skipping Discogs query. File missing artist and title tags."
237
- )
238
- return []
239
-
240
- query = f"{ artist } { title } "
241
- try :
242
- albums = self .get_albums (query )
243
- except DiscogsAPIError as e :
244
- self ._log .debug ("API Error: {0} (query: {1})" , e , query )
245
- if e .status_code == 401 :
246
- self .reset_auth ()
247
- return self .item_candidates (item , artist , title )
248
- else :
249
- return []
250
- except CONNECTION_ERRORS :
251
- self ._log .debug ("Connection error in track search" , exc_info = True )
252
- candidates = []
253
- for album_cur in albums :
254
- self ._log .debug ("searching within album {0}" , album_cur .album )
255
- track_result = self .get_track_from_album_by_title (
256
- album_cur , item ["title" ]
257
- )
258
- if track_result :
259
- candidates .append (track_result )
260
- # first 10 results, don't overwhelm with options
261
- return candidates [:10 ]
210
+ tracks = (self .get_track_from_album (a , compare_func ) for a in albums )
211
+ return list (filter (None , tracks ))
262
212
263
- def album_for_id (self , album_id ) :
213
+ def album_for_id (self , album_id : str ) -> AlbumInfo | None :
264
214
"""Fetches an album by its Discogs ID and returns an AlbumInfo object
265
215
or None if the album is not found.
266
216
"""
@@ -291,7 +241,15 @@ def album_for_id(self, album_id):
291
241
return None
292
242
return self .get_album_info (result )
293
243
294
- def get_albums (self , query ):
244
+ def track_for_id (self , track_id : str ) -> TrackInfo | None :
245
+ if album := self .album_for_id (track_id ):
246
+ for track in album .tracks :
247
+ if track .track_id == track_id :
248
+ return track
249
+
250
+ return None
251
+
252
+ def get_albums (self , query : str ) -> Iterable [AlbumInfo ]:
295
253
"""Returns a list of AlbumInfo objects for a discogs search query."""
296
254
# Strip non-word characters from query. Things like "!" and "-" can
297
255
# cause a query to return no results, even if they match the artist or
@@ -303,29 +261,28 @@ def get_albums(self, query):
303
261
query = re .sub (r"(?i)\b(CD|disc|vinyl)\s*\d+" , "" , query )
304
262
305
263
try :
306
- releases = self .discogs_client .search (query , type = "release" ).page (1 )
307
-
264
+ results = self .discogs_client .search (query , type = "release" )
265
+ results .per_page = self .config ["search_limit" ].as_number ()
266
+ releases = results .page (1 )
308
267
except CONNECTION_ERRORS :
309
268
self ._log .debug (
310
269
"Communication error while searching for {0!r}" ,
311
270
query ,
312
271
exc_info = True ,
313
272
)
314
273
return []
315
- return [
316
- album for album in map (self .get_album_info , releases [:5 ]) if album
317
- ]
274
+ return map (self .get_album_info , releases )
318
275
319
- def get_master_year (self , master_id ):
276
+ @cache
277
+ def get_master_year (self , master_id : str ) -> int | None :
320
278
"""Fetches a master release given its Discogs ID and returns its year
321
279
or None if the master release is not found.
322
280
"""
323
- self ._log .debug ("Searching for master release {0}" , master_id )
281
+ self ._log .debug ("Getting master release {0}" , master_id )
324
282
result = Master (self .discogs_client , {"id" : master_id })
325
283
326
284
try :
327
- year = result .fetch ("year" )
328
- return year
285
+ return result .fetch ("year" )
329
286
except DiscogsAPIError as e :
330
287
if e .status_code != 404 :
331
288
self ._log .debug (
@@ -695,33 +652,21 @@ def get_track_info(self, track, index, divisions):
695
652
medium_index = medium_index ,
696
653
)
697
654
698
- def get_track_index (self , position ):
655
+ @staticmethod
656
+ def get_track_index (
657
+ position : str ,
658
+ ) -> tuple [str | None , str | None , str | None ]:
699
659
"""Returns the medium, medium index and subtrack index for a discogs
700
660
track position."""
701
661
# Match the standard Discogs positions (12.2.9), which can have several
702
662
# forms (1, 1-1, A1, A1.1, A1a, ...).
703
- match = re .match (
704
- r"^(.*?)" # medium: everything before medium_index.
705
- r"(\d*?)" # medium_index: a number at the end of
706
- # `position`, except if followed by a subtrack
707
- # index.
708
- # subtrack_index: can only be matched if medium
709
- # or medium_index have been matched, and can be
710
- r"((?<=\w)\.[\w]+" # - a dot followed by a string (A.1, 2.A)
711
- r"|(?<=\d)[A-Z]+" # - a string that follows a number (1A, B2a)
712
- r")?"
713
- r"$" ,
714
- position .upper (),
715
- )
716
-
717
- if match :
663
+ medium = index = subindex = None
664
+ if match := TRACK_INDEX_RE .fullmatch (position .upper ()):
718
665
medium , index , subindex = match .groups ()
719
666
720
667
if subindex and subindex .startswith ("." ):
721
668
subindex = subindex [1 :]
722
- else :
723
- self ._log .debug ("Invalid position: {0}" , position )
724
- medium = index = subindex = None
669
+
725
670
return medium or None , index or None , subindex or None
726
671
727
672
def get_track_length (self , duration ):
0 commit comments