Skip to content

Commit

Permalink
feat: support webp covers
Browse files Browse the repository at this point in the history
  • Loading branch information
desbma committed Nov 25, 2023
1 parent c650d8c commit 2a4a98d
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 45 deletions.
47 changes: 9 additions & 38 deletions amg/__init__.py
Expand Up @@ -30,7 +30,7 @@
import threading
import urllib.parse
import webbrowser
from typing import BinaryIO, Callable, Iterable, Optional, Sequence, Tuple
from typing import Callable, Iterable, Optional, Sequence, Tuple

import appdirs
import lxml.cssselect
Expand Down Expand Up @@ -99,27 +99,19 @@ def fetch_page(url: str, *, http_cache: Optional[web_cache.WebCache] = None) ->
logging.getLogger().info(f"Got data for URL {url!r} from cache")
page = http_cache[url]
else:
logging.getLogger().debug(f"Fetching {url!r}...")
headers = {"User-Agent": USER_AGENT}
response = requests.get(url, headers=headers, timeout=TCP_TIMEOUT, proxies=PROXY)
response.raise_for_status()
page = response.content
page = fetch_ressource(url)
if http_cache is not None:
http_cache[url] = page
return lxml.etree.XML(page.decode("utf-8"), HTML_PARSER)


def fetch_ressource(url: str, dest_filepath: str) -> None:
def fetch_ressource(url: str) -> bytes:
"""Fetch ressource, and write it to file."""
logging.getLogger().debug(f"Fetching {url!r}...")
headers = {"User-Agent": USER_AGENT}
with contextlib.closing(
requests.get(url, headers=headers, timeout=TCP_TIMEOUT, proxies=PROXY, stream=True)
) as response:
response.raise_for_status()
with open(dest_filepath, "wb") as dest_file:
for chunk in response.iter_content(2**14):
dest_file.write(chunk)
response = requests.get(url, headers=headers, timeout=TCP_TIMEOUT, proxies=PROXY)
response.raise_for_status()
return response.content


def parse_review_block(review: lxml.etree.Element) -> Optional[ReviewMetadata]:
Expand Down Expand Up @@ -293,30 +285,9 @@ def getPlayCount(self, url: str) -> int:


def get_cover_data(review: ReviewMetadata) -> bytes:
"""Fetch cover and return buffer of JPEG data."""
"""Fetch cover and return buffer of image data."""
cover_url = review.cover_url if review.cover_url is not None else review.cover_thumbnail_url
cover_ext = os.path.splitext(urllib.parse.urlsplit(cover_url).path)[1][1:].lower()

with mkstemp_ctx.mkstemp(prefix="amg_", suffix=f".{cover_ext}") as filepath:
fetch_ressource(cover_url, filepath)

if cover_ext == "png":
# convert to JPEG
img = PIL.Image.open(filepath)
if img.mode != "RGB":
img = img.convert("RGB")
f: BinaryIO = io.BytesIO()
img.save(f, format="JPEG", quality=90, optimize=True)
f.seek(0)
out_bytes = f.read()
else:
if HAS_JPEGOPTIM:
cmd = ("jpegoptim", "-q", "--strip-all", filepath)
subprocess.run(cmd, check=True)
with open(filepath, "rb") as f:
out_bytes = f.read()

return out_bytes
return fetch_ressource(cover_url)


def download_and_merge(
Expand Down Expand Up @@ -573,7 +544,7 @@ def play(review: ReviewMetadata, track_urls: Sequence[str], *, merge_with_pictur
# TODO support other players (vlc, avplay, ffplay...)
merge_with_picture = merge_with_picture and HAS_FFMPEG
if merge_with_picture:
with mkstemp_ctx.mkstemp(prefix="amg_", suffix=".jpg") as cover_filepath:
with mkstemp_ctx.mkstemp(prefix="amg_") as cover_filepath:
cover_data = get_cover_data(review)
with open(cover_filepath, "wb") as f:
f.write(cover_data)
Expand Down
22 changes: 19 additions & 3 deletions amg/tag.py
Expand Up @@ -7,14 +7,17 @@
import datetime
import functools
import itertools
import io
import logging
import operator
import re
import string
from typing import Any, Deque, Dict, List, Optional, Sequence, Tuple

import magic
import more_itertools
import mutagen
import PIL.Image
import unidecode

from amg import sanitize
Expand Down Expand Up @@ -697,18 +700,31 @@ def has_embedded_album_art(filepath: str) -> bool:

def embed_album_art(mf: mutagen.File, cover_data: bytes):
"""Embed album art into audio file."""
mime = magic.from_buffer(cover_data, mime=True)
if isinstance(mf, mutagen.ogg.OggFileType):
picture = mutagen.flac.Picture()
picture.data = cover_data
picture.type = mutagen.id3.PictureType.COVER_FRONT
picture.mime = "image/jpeg"
picture.mime = mime
encoded_data = base64.b64encode(picture.write())
mf["metadata_block_picture"] = encoded_data.decode("ascii")
elif isinstance(mf, mutagen.mp3.MP3):
mf.tags.add(mutagen.id3.APIC(mime="image/jpeg", type=mutagen.id3.PictureType.COVER_FRONT, data=cover_data))
mf.tags.add(mutagen.id3.APIC(mime=mime, type=mutagen.id3.PictureType.COVER_FRONT, data=cover_data))
mf.save()
elif isinstance(mf, mutagen.mp4.MP4):
mf["covr"] = [mutagen.mp4.MP4Cover(cover_data, imageformat=mutagen.mp4.AtomDataType.JPEG)]
if mime == "image/jpeg":
fmt = mutagen.mp4.AtomDataType.JPEG
elif mime == "image/png":
fmt = mutagen.mp4.AtomDataType.PNG
else:
# convert to jpeg
in_bytes = io.BytesIO(cover_data)
img = PIL.Image.open(in_bytes)
out_bytes = io.BytesIO()
img.save(out_bytes, format="JPEG", quality=85, optimize=True)
cover_data = out_bytes.getvalue()
fmt = mutagen.mp4.AtomDataType.JPEG
mf["covr"] = [mutagen.mp4.MP4Cover(cover_data, imageformat=fmt)]


# copy month names before changing locale
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Expand Up @@ -5,6 +5,7 @@ lxml>=4.3.3
more_itertools>=8.3.0
mutagen>=1.42.0
Pillow>=6.0.0
python-magic>=0.4.27
requests>=2.22.0
tqdm==4.28.1
unidecode>=1.0.23
Expand Down
11 changes: 7 additions & 4 deletions tests/test_tag.py
Expand Up @@ -23,7 +23,9 @@ def download(url, filepath):
if os.path.isfile(cache_filepath):
shutil.copyfile(cache_filepath, filepath)
return
amg.fetch_ressource(url, filepath)
data = amg.fetch_ressource(url)
with open(filepath, "wb") as f:
f.write(data)
if cache_dir is not None:
shutil.copyfile(filepath, cache_filepath)

Expand Down Expand Up @@ -87,7 +89,8 @@ def test_tag(self):
"""Test tagging for various formats."""
artist = "Artist"
album = "Album"
cover_data = os.urandom(random.randint(10000, 500000))
# https://github.com/mathiasbynens/small/blob/master/jpeg.jpg
cover_data = b"\xff\xd8\xff\xdb\x00C\x00\x03\x02\x02\x02\x02\x02\x03\x02\x02\x02\x03\x03\x03\x03\x04\x06\x04\x04\x04\x04\x04\x08\x06\x06\x05\x06\t\x08\n\n\t\x08\t\t\n\x0c\x0f\x0c\n\x0b\x0e\x0b\t\t\r\x11\r\x0e\x0f\x10\x10\x11\x10\n\x0c\x12\x13\x12\x10\x13\x0f\x10\x10\x10\xff\xc9\x00\x0b\x08\x00\x01\x00\x01\x01\x01\x11\x00\xff\xcc\x00\x06\x00\x10\x10\x05\xff\xda\x00\x08\x01\x01\x00\x00?\x00\xd2\xcf \xff\xd9"
review = amg.ReviewMetadata(None, artist, album, None, None, None)

# vorbis
Expand All @@ -100,7 +103,7 @@ def test_tag(self):
self.assertEqual(tags[k], v)
self.assertIn("metadata_block_picture", tags)
self.assertEqual(len(tags["metadata_block_picture"]), 1)
self.assertIn(base64.b64encode(cover_data).decode(), tags["metadata_block_picture"][0])
self.assertIn(cover_data, base64.b64decode(tags["metadata_block_picture"][0]))
self.assertTrue(amg.tag.has_embedded_album_art(self.vorbis_filepath))

# opus
Expand All @@ -113,7 +116,7 @@ def test_tag(self):
self.assertEqual(tags[k], v)
self.assertIn("metadata_block_picture", tags)
self.assertEqual(len(tags["metadata_block_picture"]), 1)
self.assertIn(base64.b64encode(cover_data).decode(), tags["metadata_block_picture"][0])
self.assertIn(cover_data, base64.b64decode(tags["metadata_block_picture"][0]))
self.assertTrue(amg.tag.has_embedded_album_art(self.opus_filepath))

# mp3
Expand Down

0 comments on commit 2a4a98d

Please sign in to comment.