Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beets/config_default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import:
set_fields: {}
ignored_alias_types: []
singleton_album_disambig: yes
fix_ext_inplace: no

# --------------- Paths ---------------

Expand Down
7 changes: 7 additions & 0 deletions beets/importer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from beets.autotag.hooks import AlbumMatch
from beets.autotag.match import tag_album, tag_item
from beets.dbcore.query import PathQuery
from beets.util import extension

from .state import ImportState

Expand Down Expand Up @@ -1071,6 +1072,12 @@ def read_item(self, path: util.PathBytes):
If an item cannot be read, return `None` instead and log an
error.
"""

# Check if the file has an extension,
# Add an extension if there isn't one.
if os.path.isfile(path):
path = extension.fix_extension(path, logger=log)

try:
return library.Item.from_path(path)
except library.ReadError as exc:
Expand Down
1 change: 0 additions & 1 deletion beets/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@

from beets.library import Item


MAX_FILENAME_LENGTH = 200
WINDOWS_MAGIC_PREFIX = "\\\\?\\"
T = TypeVar("T")
Expand Down
149 changes: 149 additions & 0 deletions beets/util/extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.

"""A tool that finds an extension for files without one"""

import os
import subprocess
from logging import Logger
from pathlib import Path

import beets
from beets import util

logger = Logger.info

PathBytes = bytes
PATH_SEP: bytes = util.bytestring_path(os.sep)

# a list of audio formats I got from wikipedia https://en.wikipedia.org/wiki/Audio_file_format
AUDIO_EXTENSIONS = {
"3gp",
"aa",
"aac",
"aax",
"act",
"aiff",
"alac",
"amr",
"ape",
"au",
"awb",
"dss",
"dvf",
"flac",
"gsm",
"iklax",
"ivs",
"m4a",
"m4b",
"m4p",
"mmf",
"movpkg",
"mp1",
"mp2",
"mp3",
"mpc",
"msv",
"nmf",
"ogg",
"oga",
"mogg",
"opus",
"ra",
"rm",
"raw",
"rf64",
"sln",
"tta",
"voc",
"vox",
"wav",
"wma",
"wv",
"webm",
"8svx",
"cda",
}


def fix_extension(path_bytes: PathBytes, logger: Logger | None = None):
"""Return the `path` after adding an appropriate extension if needed.

If the file already has an extension, return as-is.
If the file has no extension, try to find the format using ffprobe.
If the file is not a music format, return as-is.
If the format is found, return path with extension.
"""
path = Path(os.fsdecode(path_bytes))
# if there is an extension, return unchanged
if path.suffix != "":
return path_bytes

# no extension detected
# use ffprobe to find the format
formats = []
shell = os.name == "nt"
if (
subprocess.run(
["ffprobe", "-version"], capture_output=True, shell=shell
).stderr.decode("utf-8")
!= ""
):
if logger:
logger.error("ffprobe needed to determine file extension")
output = subprocess.run(
[
"ffprobe",
"-hide_banner",
"-loglevel",
"fatal",
"-show_format",
"--",
str(path),
],
capture_output=True,
shell=shell,
)
out = output.stdout.decode("utf-8")
err = output.stderr.decode("utf-8")
if err != "":
if logger:
logger.error("Error with ffprobe\n", err)
for line in out.split("\n"):
if line.startswith("format_name="):
formats = line.split("=")[1].split(",")
detected_format = ""
# The first format from ffprobe that is on this list is taken
for f in formats:
if f in AUDIO_EXTENSIONS:
detected_format = f
break

# if ffprobe can't find a format, the file is prob not music
if detected_format == "":
return path_bytes

# cp and add ext. If already exist, use that file
# assume, for example, the only diff between 'asdf.mp3' and 'asdf' is format
new_path = path.with_suffix("." + detected_format)
if not new_path.exists():
if beets.config["import"]["fix_ext_inplace"]:
util.move(bytes(path), bytes(new_path))
else:
util.copy(bytes(path), bytes(new_path))
else:
if logger:
logger.info("Import file with matching format to original target")
return new_path
3 changes: 3 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ Unreleased
New features
~~~~~~~~~~~~

- :ref:`import-cmd` Use ffprobe to recognize format of any import music file
that has no extension. If the file cannot be recognized as a music file, leave
it alone. :bug:`4881`
- Query: Add ``has_cover_art`` computed field to query items by embedded cover
art presence. Users can now search for tracks with or without embedded artwork
using ``beet list has_cover_art:true`` or ``beet list has_cover_art:false``.
Expand Down
17 changes: 17 additions & 0 deletions docs/reference/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,23 @@ feature is currently supported by the :doc:`/plugins/discogs` and the

Default: ``yes``.

.. _fix_ext_inplace:

fix_ext_inplace
~~~~~~~~~~~~~~~

The extension of each file is checked at import. If a file has no extension and
Comment thread
gaotue marked this conversation as resolved.
its binary matches a music format, beets will look for a file with the same name
and matching extension in the same directory. For example, when importing an mp3
file named ``asdf``, beets look for ``asdf.mp3``. If found, that file will be
imported instead. Otherwise, if ``fix_ext_inplace`` is ``yes``, then the file
will be renamed to contain the extension. If ``fix_ext_inplace`` is ``no``, then
the original will be left untouched and a copy with extension will be created in
the same directory. This is only done if the user has ``ffprobe`` (bundled with
FFmpeg)

Default: ``no``.

.. _match-config:

Autotagger Matching Options
Expand Down
Binary file added test/rsrc/no_ext
Binary file not shown.
Empty file added test/rsrc/no_ext_not_music
Empty file.
57 changes: 57 additions & 0 deletions test/test_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,63 @@ def test_set_fields(self):
assert item.disc == disc


@pytest.mark.skipif(
not has_program("ffprobe", ["-L"]),
"need ffprobe for format recognition",
)
class ImportFormatTest:
"""Test fix_extension during import."""

def test_recognize_format(self):
resource_src = os.path.join(_common.RSRC, b"no_ext")
resource_path = os.path.join(self.import_dir, b"no_ext")
util.copy(resource_src, resource_path)
self.setup_importer()
self.importer.paths = [resource_path]
self.importer.run()
assert self.lib.items().get().path.endswith(b".mp3")

def test_recognize_format_already_exist(self):
resource_path = os.path.join(_common.RSRC, b"no_ext")
temp_resource_path = os.path.join(self.temp_dir, b"no_ext")
util.copy(resource_path, temp_resource_path)
new_path = os.path.join(self.temp_dir, b"no_ext.mp3")
util.copy(temp_resource_path, new_path)
self.setup_importer()
self.importer.paths = [temp_resource_path]
with capture_log() as logs:
self.importer.run()
assert "Import file with matching format to original target" in logs
assert self.lib.items().get().path.endswith(b".mp3")

def test_recognize_format_not_music(self):
resource_path = os.path.join(_common.RSRC, b"no_ext_not_music")
self.setup_importer()
self.importer.paths = [resource_path]
self.importer.run()
assert len(self.lib.items()) == 0

def test_recognize_format_change_original(self):
config["import"]["fix_ext_inplace"] = True
resource_src = os.path.join(_common.RSRC, b"no_ext")
resource_path = os.path.join(self.temp_dir, b"no_ext")
util.copy(resource_src, resource_path)
self.setup_importer()
self.importer.paths = [resource_path]
self.importer.run()
assert not Path(os.path.join(self.temp_dir_path, "no_ext")).exists()

def test_recognize_format_keep_original(self):
config["import"]["fix_ext_inplace"] = False
resource_src = os.path.join(_common.RSRC, b"no_ext")
resource_path = os.path.join(self.temp_dir, b"no_ext")
util.copy(resource_src, resource_path)
self.setup_importer()
self.importer.paths = [resource_path]
self.importer.run()
assert Path(os.path.join(self.temp_dir_path, "no_ext")).exists()


class ImportTest(PathsMixin, AutotagImportTestCase):
"""Test APPLY, ASIS and SKIP choices."""

Expand Down
Loading