Skip to content

Commit

Permalink
f-strings for the win
Browse files Browse the repository at this point in the history
  • Loading branch information
desbma committed Feb 21, 2019
1 parent 6af0da3 commit 3cb7b21
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 58 deletions.
69 changes: 32 additions & 37 deletions amg/__init__.py
Expand Up @@ -61,7 +61,7 @@
"tags"))

ROOT_URL = "https://www.angrymetalguy.com/"
REVIEW_URL = "%scategory/reviews/" % (ROOT_URL)
REVIEW_URL = f"{ROOT_URL}category/reviews/"
LAST_PLAYED_EXPIRATION_DAYS = 365
HTML_PARSER = lxml.etree.HTMLParser()
REVIEW_BLOCK_SELECTOR = lxml.cssselect.CSSSelector("article.category-review, "
Expand All @@ -75,17 +75,17 @@
REVERBNATION_SCRIPT_SELECTOR = lxml.cssselect.CSSSelector("script")
IS_TRAVIS = os.getenv("CI") and os.getenv("TRAVIS")
TCP_TIMEOUT = 30.1 if IS_TRAVIS else 15.1
YDL_MAX_DOWNLOAD_TRIES = 5
USER_AGENT = "Mozilla/5.0 AMG-Player/{}".format(__version__)
YDL_MAX_DOWNLOAD_ATTEMPTS = 5
USER_AGENT = f"Mozilla/5.0 AMG-Player/{__version__}"


def fetch_page(url, *, http_cache=None):
""" Fetch page & parse it with LXML. """
if (http_cache is not None) and (url in http_cache):
logging.getLogger().info("Got data for URL '%s' from cache" % (url))
logging.getLogger().info(f"Got data for URL '{url}' from cache")
page = http_cache[url]
else:
logging.getLogger().debug("Fetching '%s'..." % (url))
logging.getLogger().debug(f"Fetching '{url}'...")
headers = {"User-Agent": USER_AGENT}
response = requests.get(url, headers=headers, timeout=TCP_TIMEOUT)
response.raise_for_status()
Expand All @@ -97,7 +97,7 @@ def fetch_page(url, *, http_cache=None):

def fetch_ressource(url, dest_filepath):
""" Fetch ressource, and write it to file. """
logging.getLogger().debug("Fetching '%s'..." % (url))
logging.getLogger().debug(f"Fetching '{url}'...")
headers = {"User-Agent": USER_AGENT}
with contextlib.closing(requests.get(url, headers=headers, timeout=TCP_TIMEOUT, stream=True)) as response:
response.raise_for_status()
Expand Down Expand Up @@ -148,7 +148,7 @@ def get_reviews():
for i in itertools.count():
url = REVIEW_URL
if i > 0:
url += "page/%u" % (i + 1)
url += f"page/{i + 1}"
page = fetch_page(url)
for review in REVIEW_BLOCK_SELECTOR(page):
r = parse_review_block(review)
Expand Down Expand Up @@ -176,7 +176,7 @@ def get_embedded_track(page, http_cache):
rn_prefix = "https://www.reverbnation.com/widget_code/"
if any(map(iframe_url.startswith, yt_prefixes)):
yt_id = urllib.parse.urlparse(iframe_url).path.rsplit("/", 1)[-1]
urls = ("https://www.youtube.com/watch?v=%s" % (yt_id),)
urls = (f"https://www.youtube.com/watch?v={yt_id}",)
elif iframe_url.startswith(bc_prefix):
iframe_page = fetch_page(iframe_url, http_cache=http_cache)
js = BANDCAMP_JS_SELECTOR(iframe_page)[-1].text
Expand Down Expand Up @@ -209,9 +209,9 @@ def get_embedded_track(page, http_cache):
urls = (url,)
audio_only = True
except Exception as e:
logging.getLogger().error("%s: %s" % (e.__class__.__qualname__, e))
logging.getLogger().error(f"{e.__class__.__qualname__}: {e}")
if urls is not None:
logging.getLogger().debug("Track URL(s): %s" % (" ".join(urls)))
logging.getLogger().debug(f"Track URL(s): {' '.join(urls)}")
return urls, audio_only


Expand Down Expand Up @@ -275,7 +275,7 @@ def get_cover_data(review):
cover_url = review.cover_url if review.cover_url is not None else review.cover_thumbnail_url
cover_ext = os.path.splitext(urllib.parse.urlsplit(cover_url).path)[1][1:].lower()

with mkstemp_ctx.mkstemp(prefix="amg_", suffix=".%s" % (cover_ext)) as filepath:
with mkstemp_ctx.mkstemp(prefix="amg_", suffix=f".{cover_ext}") as filepath:
fetch_ressource(cover_url, filepath)

if cover_ext == "png":
Expand Down Expand Up @@ -327,7 +327,7 @@ def download_and_merge(review, track_urls, tmp_dir, cover_filepath):
concat_filepath = tempfile.mktemp(dir=tmp_dir, suffix=".txt")
with open(concat_filepath, "wt") as concat_file:
for audio_filepath in audio_filepaths:
concat_file.write("file %s\n" % (audio_filepath))
concat_file.write(f"file {audio_filepath}\n")

# merge
merged_filepath = tempfile.mktemp(dir=tmp_dir, suffix=".mkv")
Expand All @@ -341,7 +341,7 @@ def download_and_merge(review, track_urls, tmp_dir, cover_filepath):
"-c:v", "libx264", "-crf", "18", "-tune:v", "stillimage", "-preset", "ultrafast",
"-shortest",
"-f", "matroska", merged_filepath)
logging.getLogger().debug("Merging Audio and image with command: %s" % (subprocess.list2cmdline(cmd)))
logging.getLogger().debug(f"Merging Audio and image with command: {subprocess.list2cmdline(cmd)}")
subprocess.check_call(cmd, cwd=tmp_dir)

return merged_filepath
Expand All @@ -353,12 +353,12 @@ def download_audio(review, track_urls, *, max_cover_size):
with ytdl_tqdm.ytdl_tqdm(leave=False,
mininterval=0.05,
miniters=1) as ytdl_progress:
ydl_opts = {"outtmpl": os.path.join(tmp_dir,
("%s-" % (review.date_published.strftime("%Y%m%d%H%M%S"))) +
r"%(autonumber)s" +
(". %s - %s" % (sanitize.sanitize_for_path(review.artist.replace(os.sep, "_")),
sanitize.sanitize_for_path(review.album.replace(os.sep, "_")))) +
r".%(ext)s"),
filename_template = (f"{review.date_published.strftime('%Y%m%d%H%M%S')}-"
r"%(autonumber)s"
f". {sanitize.sanitize_for_path(review.artist.replace(os.sep, '_'))} - "
f"{sanitize.sanitize_for_path(review.album.replace(os.sep, '_'))}"
r".%(ext)s")
ydl_opts = {"outtmpl": os.path.join(tmp_dir, filename_template),
"format": "opus/vorbis/bestaudio",
"postprocessors": [{"key": "FFmpegExtractAudio"},
{"key": "FFmpegMetadata"}],
Expand Down Expand Up @@ -424,9 +424,8 @@ def download_audio(review, track_urls, *, max_cover_size):
files_tags[track_filepath] = tag.tag(track_filepath, review, cover_data)
except Exception as e:
# raise
logging.getLogger().warning("Failed to add tags to file '%s': %s %s" % (track_filepath,
e.__class__.__qualname__,
e))
logging.getLogger().warning(f"Failed to add tags to file '{track_filepath}': "
f"{e.__class__.__qualname__} {e}")
# RG/R128
if HAS_FFMPEG:
r128gain.process(track_filepaths, album_gain=len(track_filepaths) > 1)
Expand All @@ -444,7 +443,7 @@ def download_audio(review, track_urls, *, max_cover_size):
filename = " - ".join((filename, sanitize.sanitize_for_path(file_tags["title"][-1])))
dest_filename = "".join((filename, ext))
dest_filepath = os.path.join(os.getcwd(), dest_filename)
logging.getLogger().debug("Moving %s to %s" % (repr(track_filepath), repr(dest_filepath)))
logging.getLogger().debug(f"Moving {repr(track_filepath)} to {repr(dest_filepath)}")
shutil.move(track_filepath, dest_filepath)

return True
Expand All @@ -465,24 +464,24 @@ def play(review, track_urls, *, merge_with_picture):
if merged_filepath is None:
return
cmd = ("mpv", merged_filepath)
logging.getLogger().debug("Playing with command: %s" % (subprocess.list2cmdline(cmd)))
logging.getLogger().debug(f"Playing with command: {subprocess.list2cmdline(cmd)}")
subprocess.check_call(cmd)

else:
for track_url in track_urls:
cmd_dl = ("youtube-dl", "-o", "-", track_url)
logging.getLogger().debug("Downloading with command: %s" % (subprocess.list2cmdline(cmd_dl)))
logging.getLogger().debug(f"Downloading with command: {subprocess.list2cmdline(cmd_dl)}")
dl_process = subprocess.Popen(cmd_dl,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
cmd = ("mpv", "--force-seekable=yes", "-")
logging.getLogger().debug("Playing with command: %s" % (subprocess.list2cmdline(cmd)))
logging.getLogger().debug(f"Playing with command: {subprocess.list2cmdline(cmd)}")
subprocess.check_call(cmd, stdin=dl_process.stdout)


def cl_main():
# parse args
arg_parser = argparse.ArgumentParser(description="AMG Player v%s. %s" % (__version__, __doc__),
arg_parser = argparse.ArgumentParser(description=f"AMG Player v{__version__}. {__doc__}",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument("-c",
"--count",
Expand Down Expand Up @@ -558,7 +557,7 @@ def cl_main():
compression=web_cache.Compression.DEFLATE)
purged_count = http_cache.purge()
row_count = len(http_cache)
logging.getLogger().debug("HTTP Cache contains %u entries (%u removed)" % (row_count, purged_count))
logging.getLogger().debug(f"HTTP Cache contains {row_count} entries ({purged_count} removed)")

# initial menu
if args.mode in (PlayerMode.MANUAL, PlayerMode.RADIO):
Expand Down Expand Up @@ -601,15 +600,11 @@ def cl_main():
logging.getLogger().warning("Unable to extract embedded track")
else:
print("-" * (shutil.get_terminal_size()[0] - 1))
print("Artist: %s\n"
"Album: %s\n"
"Review URL: %s\n"
"Published: %s\n"
"Tags: %s" % (review.artist,
review.album,
review.url,
review.date_published.strftime("%x %H:%M"),
", ".join(review.tags)))
print(f"Artist: {review.artist}\n"
f"Album: {review.album}\n"
f"Review URL: {review.url}\n"
f"Published: {review.date_published.strftime('%x %H:%M')}\n"
f"Tags: {', '.join(review.tags)}")
if args.interactive:
input_loop = True
while input_loop:
Expand Down
2 changes: 1 addition & 1 deletion amg/colored_logging.py
Expand Up @@ -26,5 +26,5 @@ def format(self, record):
except KeyError:
pass
else:
message = "\033[%u;%um%s\033[0m" % (int(bold), 30 + color_code, message)
message = f"\033[{bold:d};{30 + color_code}m{message}\033[0m"
return message
20 changes: 9 additions & 11 deletions amg/menu.py
Expand Up @@ -15,13 +15,12 @@ class AmgMenu(cursesmenu.CursesMenu):
def __init__(self, *, reviews, known_reviews, http_cache, mode, selected_idx):
menu_subtitle = {amg.PlayerMode.MANUAL: "Select a track",
amg.PlayerMode.RADIO: "Select track to start from"}
super().__init__("AMG Player v%s" % (amg.__version__),
"%s mode: %s "
super().__init__(f"AMG Player v{amg.__version__}",
f"{mode.name.capitalize()} mode: {menu_subtitle[mode]} "
"(ENTER to play, "
"D to download audio, "
"R to open review, "
"Q to exit)" % (mode.name.capitalize(),
menu_subtitle[mode]),
"Q to exit)",
True)
if selected_idx is not None:
self.current_option = selected_idx
Expand Down Expand Up @@ -59,9 +58,8 @@ def reviewsToStrings(reviews, known_reviews, http_cache):
for i, review in enumerate(reviews):
try:
play_count = known_reviews.getPlayCount(review.url)
played = "Last played: %s (%u time%s)" % (known_reviews.getLastPlayed(review.url).strftime("%x %H:%M"),
play_count,
"s" if play_count > 1 else "")
played = (f"Last played: {known_reviews.getLastPlayed(review.url).strftime('%x %H:%M')} "
f"({play_count} time{'s' if play_count > 1 else ''})")
except KeyError:
if review.url in http_cache:
review_page = amg.fetch_page(review.url, http_cache=http_cache)
Expand All @@ -71,8 +69,8 @@ def reviewsToStrings(reviews, known_reviews, http_cache):
played = "Last played: never"
else:
played = "Last played: never"
lines.append(("%s - %s" % (review.artist, review.album),
"Published: %s" % (review.date_published.strftime("%x")),
lines.append((f"{review.artist} - {review.album}",
f"Published: {review.date_published.strftime('%x')}",
played))
# auto align/justify
max_lens = [0] * len(lines[0])
Expand All @@ -82,8 +80,8 @@ def reviewsToStrings(reviews, known_reviews, http_cache):
max_lens[i] = len(s)
sep = "\t"
for i, line in enumerate(lines):
lines[i] = "%s%s" % (" " if i < 9 else "",
sep.join(s.ljust(max_len + 1) for s, max_len in zip(line, max_lens)))
lines[i] = "".join((" " if i < 9 else "",
sep.join(s.ljust(max_len + 1) for s, max_len in zip(line, max_lens))))
return lines

@staticmethod
Expand Down
3 changes: 1 addition & 2 deletions amg/sanitize.py
Expand Up @@ -4,8 +4,7 @@
import unidecode


VALID_PATH_CHARS = frozenset("-_.()!#$%%&'@^{}~ %s%s" % (string.ascii_letters,
string.digits))
VALID_PATH_CHARS = frozenset(f"-_.()!#$%%&'@^{{}}~ {string.ascii_letters}{string.digits}")
TAG_LOWERCASE_WORDS = frozenset(("a", "an", "and", "at", "for", "from", "in",
"of", "on", "or", "over", "the", "to", "with",
"de", "des", "du", "le", "la", "les",
Expand Down
13 changes: 6 additions & 7 deletions amg/tag.py
Expand Up @@ -115,8 +115,8 @@ def __init__(self, artist, album):
for y in range(year - 5, year + 1):
expressions.append(str(y))
for month_name, month_abbr in zip(MONTH_NAMES, MONTH_NAMES_ABBR):
expressions.append("%s %u" % (month_name, y))
expressions.append("%s %u" % (month_abbr, y))
expressions.append(f"{month_name} {y}")
expressions.append(f"{month_abbr} {y}")
expressions.sort(key=len, reverse=True)
expressions.remove("song")
suffix_cleaner = SimpleSuffixCleaner()
Expand Down Expand Up @@ -170,9 +170,8 @@ def cleanup(self, title):

new_title = cleaner.cleanup(cur_title, *args)
if new_title and (new_title != cur_title):
logging.getLogger().debug("%s changed title tag: %s -> %s" % (cleaner.__class__.__name__,
repr(cur_title),
repr(new_title)))
logging.getLogger().debug(f"{cleaner.__class__.__name__} changed title tag: "
f"{repr(cur_title)} -> {repr(new_title)}")
# update string and remove this cleaner to avoid calling it several times
cur_title = new_title
remove_cur_cleaner = not cleaner.doKeep()
Expand All @@ -198,7 +197,7 @@ def cleanup(self, title):
del self.cleaners[to_del_idx]

if cur_title != title:
logging.getLogger().info("Fixed title tag: %s -> %s" % (repr(title), repr(cur_title)))
logging.getLogger().info(f"Fixed title tag: {repr(title)} -> {repr(cur_title)}")
return cur_title


Expand Down Expand Up @@ -493,7 +492,7 @@ def normalize_title_tag(title, artist, album):

def tag(track_filepath, review, cover_data):
""" Tag an audio file, return tag dict excluding RG/R128 info and album art. """
logging.getLogger().info("Tagging file '%s'" % (track_filepath))
logging.getLogger().info(f"Tagging file '{track_filepath}'")
mf = mutagen.File(track_filepath)
if isinstance(mf, mutagen.mp3.MP3):
mf = mutagen.easyid3.EasyID3(track_filepath)
Expand Down

0 comments on commit 3cb7b21

Please sign in to comment.