Skip to content

Commit

Permalink
Add more type annotations and mark functions as private
Browse files Browse the repository at this point in the history
  • Loading branch information
geigerzaehler committed Mar 8, 2024
1 parent 1032756 commit b17c5fc
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 55 deletions.
113 changes: 61 additions & 52 deletions beetsplug/alternatives.py
Expand Up @@ -22,10 +22,11 @@

import beets
from beets import art, util
from beets.library import Item, parse_query_string
from beets.library import Item, Library, parse_query_string
from beets.plugins import BeetsPlugin
from beets.ui import Subcommand, UserError, decargs, get_path_formats, input_yn, print_
from beets.util import FilesystemError, bytestring_path, displayable_path, syspath
from typing_extensions import override

import beetsplug.convert as convert

Expand All @@ -52,7 +53,7 @@ def __init__(self):
def commands(self):
return [AlternativesCommand(self)]

def update(self, lib, options):
def update(self, lib: Library, options):
try:
alt = self.alternative(options.name, lib)
except KeyError as e:
Expand All @@ -72,7 +73,7 @@ def list_tracks(self, lib, options):
if alt.path_key in item:
print_(format(item))

def alternative(self, name, lib):
def alternative(self, name: str, lib: Library):
conf = self.config[name]
if not conf.exists():
raise KeyError(name)
Expand Down Expand Up @@ -183,7 +184,7 @@ def parse_config(self, config):
dir = os.path.join(self.lib.directory, dir)
self.directory = dir

def item_change_actions(self, item, path, dest):
def item_change_actions(self, item: Item, path: bytes, dest: bytes) -> List[Action]:
"""Returns the necessary actions for items that were previously in the
external collection, but might require metadata updates.
"""
Expand All @@ -207,21 +208,21 @@ def item_change_actions(self, item, path, dest):

return actions

def matched_item_action(self, item):
path = self.get_path(item)
def _matched_item_action(self, item: Item) -> List[Action]:
path = self._get_stored_path(item)
if path and os.path.lexists(syspath(path)):
dest = self.destination(item)
_, path_ext = os.path.splitext(path)
_, dest_ext = os.path.splitext(dest)
if not path_ext == dest_ext:
# formats config option changed
return (item, [Action.REMOVE, Action.ADD])
return [Action.REMOVE, Action.ADD]
else:
return (item, self.item_change_actions(item, path, dest))
return self.item_change_actions(item, path, dest)
else:
return (item, [Action.ADD])
return [Action.ADD]

def items_actions(self) -> Iterator[Tuple[Item, List[Action]]]:
def _items_actions(self) -> Iterator[Tuple[Item, List[Action]]]:
matched_ids = set()
for album in self.lib.albums():
if self.query.match(album):
Expand All @@ -230,11 +231,11 @@ def items_actions(self) -> Iterator[Tuple[Item, List[Action]]]:

for item in self.lib.items():
if item.id in matched_ids or self.query.match(item):
yield self.matched_item_action(item)
elif self.get_path(item):
yield (item, self._matched_item_action(item))
elif self._get_stored_path(item):
yield (item, [Action.REMOVE])

def ask_create(self, create=None):
def ask_create(self, create: Optional[bool] = None) -> bool:
if not self.removable:
return True
if create is not None:
Expand All @@ -249,15 +250,15 @@ def ask_create(self, create=None):
)
return input_yn(msg, require=True)

def update(self, create=None):
def update(self, create: Optional[bool] = None):
if not os.path.isdir(syspath(self.directory)) and not self.ask_create(create):
print_("Skipping creation of {0}".format(displayable_path(self.directory)))
return

converter = self.converter()
for item, actions in self.items_actions():
converter = self._converter()
for item, actions in self._items_actions():
dest = self.destination(item)
path = self.get_path(item)
path = self._get_stored_path(item)
for action in actions:
if action == Action.MOVE:
print_(
Expand All @@ -269,53 +270,56 @@ def update(self, create=None):
util.move(path, dest)
assert path is not None
util.prune_dirs(os.path.dirname(path), root=self.directory)
self.set_path(item, dest)
self._set_stored_path(item, dest)
item.store()
path = dest
elif action == Action.WRITE:
print_("*{0}".format(displayable_path(path)))
item.write(path=path)
elif action == Action.SYNC_ART:
print_("~{0}".format(displayable_path(path)))
self.sync_art(item, path)
assert path is not None
self._sync_art(item, path)
elif action == Action.ADD:
print_("+{0}".format(displayable_path(dest)))
converter.submit(item)
elif action == Action.REMOVE:
print_("-{0}".format(displayable_path(path)))
self.remove_item(item)
self._remove_file(item)
item.store()

for item, dest in converter.as_completed():
self.set_path(item, dest)
self._set_stored_path(item, dest)
item.store()
converter.shutdown()

def destination(self, item: Item) -> bytes:
"""Returns the path for `item` in the external collection."""
path = item.destination(basedir=self.directory, path_formats=self.path_formats)
assert isinstance(path, bytes)
return path

def set_path(self, item, path: bytes):
def _set_stored_path(self, item: Item, path: bytes):
item[self.path_key] = str(path, "utf8")

@staticmethod
def _get_path(item, path_key) -> Optional[bytes]:
def _get_stored_path(self, item: Item) -> Optional[bytes]:
try:
return item[path_key].encode("utf8")
path = item[self.path_key]
except KeyError:
return None
if path:
return path.encode("utf8")
else:
return None

def get_path(self, item) -> Optional[bytes]:
return self._get_path(item, self.path_key)

def remove_item(self, item):
path = self.get_path(item)
def _remove_file(self, item: Item):
"""Remove the external file for `item`."""
path = self._get_stored_path(item)
_remove(path)
util.prune_dirs(path, root=self.directory)
del item[self.path_key]

def converter(self):
def _converter(self) -> "Worker":
def _convert(item):
dest = self.destination(item)
util.mkdirall(dest)
Expand All @@ -324,8 +328,8 @@ def _convert(item):

return Worker(_convert, self.max_workers)

def sync_art(self, item, path):
"""Embed artwork in the destination file."""
def _sync_art(self, item: Item, path: bytes):
"""Embed artwork in the file at `path`."""
album = item.get_album()
if album:
if album.artpath and os.path.isfile(syspath(album.artpath)):
Expand All @@ -347,35 +351,37 @@ def __init__(self, log, name, formats, lib, config):
self.formats = [convert.ALIASES.get(f, f) for f in formats]
self.convert_cmd, self.ext = convert.get_format(self.formats[0])

def converter(self):
@override
def _converter(self) -> "Worker":
fs_lock = threading.Lock()

def _convert(item):
dest = self.destination(item)
with fs_lock:
util.mkdirall(dest)

if self.should_transcode(item):
if self._should_transcode(item):
self._encode(self.convert_cmd, item.path, dest)
# Don't rely on the converter to write correct/complete tags.
item.write(path=dest)
else:
self._log.debug("copying {0}".format(displayable_path(dest)))
util.copy(item.path, dest, replace=True)
if self._embed:
self.sync_art(item, dest)
self._sync_art(item, dest)
return item, dest

return Worker(_convert, self.max_workers)

def destination(self, item: Item):
dest = super(ExternalConvert, self).destination(item)
if self.should_transcode(item):
@override
def destination(self, item: Item) -> bytes:
dest = super().destination(item)
if self._should_transcode(item):
return os.path.splitext(dest)[0] + b"." + self.ext
else:
return dest

def should_transcode(self, item):
def _should_transcode(self, item):
return item.format.lower() not in self.formats


Expand All @@ -385,6 +391,7 @@ class SymlinkType(Enum):


class SymlinkView(External):
@override
def parse_config(self, config):
if "query" not in config:
config["query"] = "" # This is a TrueQuery()
Expand All @@ -398,7 +405,8 @@ def parse_config(self, config):

super(SymlinkView, self).parse_config(config)

def item_change_actions(self, item, path, dest):
@override
def item_change_actions(self, item: Item, path: bytes, dest: bytes):
"""Returns the necessary actions for items that were previously in the
external collection, but might require metadata updates.
"""
Expand All @@ -413,32 +421,33 @@ def item_change_actions(self, item, path, dest):

return actions

@override
def update(self, create=None):
for item, actions in self.items_actions():
for item, actions in self._items_actions():
dest = self.destination(item)
path = self.get_path(item)
path = self._get_stored_path(item)
for action in actions:
if action == Action.MOVE:
print_(
">{0} -> {1}".format(
displayable_path(path), displayable_path(dest)
)
)
self.remove_item(item)
self.create_symlink(item)
self.set_path(item, dest)
self._remove_file(item)
self._create_symlink(item)
self._set_stored_path(item, dest)
elif action == Action.ADD:
print_("+{0}".format(displayable_path(dest)))
self.create_symlink(item)
self.set_path(item, dest)
self._create_symlink(item)
self._set_stored_path(item, dest)
elif action == Action.REMOVE:
print_("-{0}".format(displayable_path(path)))
self.remove_item(item)
self._remove_file(item)
else:
continue
item.store()

def create_symlink(self, item):
def _create_symlink(self, item: Item):
dest = self.destination(item)
util.mkdirall(dest)
link = (
Expand All @@ -448,8 +457,8 @@ def create_symlink(self, item):
)
util.link(link, dest)

def sync_art(self, item, path):
# FIXME: symlink art
@override
def _sync_art(self, item: Item, path: bytes):
pass


Expand Down
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Expand Up @@ -35,6 +35,7 @@ confuse = "^2.0.1"
mediafile = "^0.12.0"
typeguard = "^4.1.5"
pyright = "^1.1.340"
typing-extensions = "^4.9.0"

[tool.pytest.ini_options]
addopts = "--cov --cov-report=term --cov-report=html"
Expand Down
2 changes: 1 addition & 1 deletion test/cli_test.py
Expand Up @@ -273,7 +273,7 @@ def test_update_older(self):
mediafile = MediaFile(syspath(self.get_path(item)))
self.assertEqual(mediafile.composer, "JSB")

def test_no_udpdate_newer(self):
def test_no_update_newer(self):
item = self.add_external_track("myexternal")
item["composer"] = "JSB"
item.store()
Expand Down
5 changes: 4 additions & 1 deletion test/helper.py
Expand Up @@ -321,7 +321,10 @@ def add_external_album(self, ext_name, **kwargs):
return album

def get_path(self, item, path_key="alt.myexternal") -> Optional[bytes]:
return alternatives.External._get_path(item, path_key)
try:
return item[path_key].encode("utf8")
except KeyError:
return None


class MockedWorker(alternatives.Worker):
Expand Down

0 comments on commit b17c5fc

Please sign in to comment.