Configuration cell

In [None]:
# -------------------------
# CONFIGURATION
# -------------------------

from pathlib import Path
import shutil
import logging
import time
import base64
import json
import re
from typing import Optional, Tuple, List
import requests
from mutagen.flac import FLAC, Picture
from mutagen.id3 import ID3, APIC, TIT2, TPE1, TALB, TDRC, TRCK, TPOS, TCON, TSRC, ID3NoHeaderError
from mutagen.wave import WAVE
import unicodedata
from send2trash import send2trash
import platform
import struct
import io
import wave
import tempfile

# User-editable configuration
CREDENTIALS_PATH = Path("credentials.json")
RECURSIVE = False   # True = aslo apply to files on subdirectories
PROCESS_TOP_X = 100   # (int) number of files to process in this run

# -------------------------
# Filename parsing mode:
# 0 = "Artist - Title"  (default, left=artist, right=title)
# 1 = "Title - Artist"  (left=title, right=artist)
# Applies to FLAC, MP3 and WAV filename fallback parsing.
FILENAME_PARSE_MODE = 1
# -------------------------

# Spotify endpoints
SPOTIFY_TOKEN_URL =          "https://accounts.spotify.com/api/token"
SPOTIFY_SEARCH_URL =         "https://api.spotify.com/v1/search"
SPOTIFY_ARTIST_URL =         "https://api.spotify.com/v1/artists/{}"
SPOTIFY_ARTIST_ALBUMS_URL =  "https://api.spotify.com/v1/artists/{}/albums"
SPOTIFY_ALBUM_TRACKS_URL =   "https://api.spotify.com/v1/albums/{}/tracks"

# Timeouts and limits
REQUEST_TIMEOUT = 12
SPOTIFY_MAX_LIMIT = 50

# Behavior flags
OVERWRITE_TITLE_ARTIST_OR_ALBUM = 1   # 0 = preserve title/artist/album, 1 = overwrite
UPDATE_ONLY_GENRE = 0                 # 1 = only update genre
PRINT_SEARCH_INFO = 1                 # 1 = extended logs
SEARCH_CANDIDATE_LIMIT = 5            # number of spotify tracks to search per music file
MARKET: Optional[str] = None          # set e.g. "US" or "ES" to restrict results

logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

Music utilities functions

In [None]:
# -------------------------
# MUSIC UTILITIES SECTION
# -------------------------
_FILENAME_SPLIT_RE = re.compile(r"\s[-–—]\s")


def infer_artist_title_from_filename(p: Path) -> Tuple[Optional[str], Optional[str]]:
    """
    Infer artist and title from filename, respecting FILENAME_PARSE_MODE:
      - If filename contains "Left - Right" (separator - or long dashes), return according to mode:
          mode 0: (artist=Left, title=Right)
          mode 1: (artist=Right, title=Left)
      - If filename contains a single hyphen WITHOUT spaces, split on first hyphen and apply same mode.
      - If filename has no separator, treat the entire stem as TITLE (artist unknown).
    """
    stem = p.stem.strip()
    if not stem:
        return None, None

    # Prefer explicit " space - space " separators
    m = _FILENAME_SPLIT_RE.split(stem, maxsplit=1)
    if len(m) == 2:
        left = m[0].strip()
        right = m[1].strip()
        if FILENAME_PARSE_MODE == 0:
            artist = left or None
            title = right or None
        else:
            artist = right or None
            title = left or None
        return artist, title

    # Fallback: a simple hyphen without spaces (e.g., "Artist-Title" or "Title-Artist")
    if "-" in stem:
        parts = stem.split("-", 1)
        left = parts[0].strip()
        right = parts[1].strip()
        if FILENAME_PARSE_MODE == 0:
            artist = left or None
            title = right or None
        else:
            artist = right or None
            title = left or None
        return artist, title

    # NO separator: treat whole stem as title (artist unknown)
    return None, stem or None


def unique_temp_copy(src: Path) -> Path:
    base_tmp = src.name + ".tmp"
    temp_path = src.with_name(base_tmp)
    i = 0
    while temp_path.exists():
        i += 1
        temp_path = src.with_name(f"{src.name}.tmp{i}")
    shutil.copy2(str(src), str(temp_path))
    return temp_path


def send_original_to_trash(original: Path) -> None:
    try:
        send2trash(str(original))
    except Exception:
        try:
            original.unlink()
        except Exception:
            pass

Search utilities functions

In [None]:
# -------------------------
# SEARCH UTILITIES SECTION
# -------------------------
def _strip_parentheses_with_feat(s: Optional[str]) -> str:
    if not s:
        return ""
    def repl(m):
        inner = m.group(1)
        if re.search(r"\b(feat\.?|ft\.?)\b", inner, flags=re.IGNORECASE):
            return " "
        return m.group(0)
    s = re.sub(r"\(([^)]*)\)", repl, s)
    s = re.sub(r"\[([^]]*)\]", repl, s)
    s = re.sub(r"\s+", " ", s).strip()
    return s


def _extract_remixer_tokens_from_title(s: Optional[str]) -> List[str]:
    if not s:
        return []
    res: List[str] = []
    for m in re.finditer(r"\(([^)]*remix[^)]*)\)", s, flags=re.IGNORECASE):
        inner = m.group(1)
        name = re.sub(r"\bremix\b", " ", inner, flags=re.IGNORECASE)
        name = re.sub(r"[^0-9a-zA-Z\s]", " ", name)
        name = unicodedata.normalize("NFKD", name)
        name = "".join(ch for ch in name if not unicodedata.combining(ch))
        name = re.sub(r"\s+", " ", name).strip().lower()
        if name:
            res.extend([t for t in name.split() if t])
    for m in re.finditer(r"\[([^]]*remix[^]]*)\]", s, flags=re.IGNORECASE):
        inner = m.group(1)
        name = re.sub(r"\bremix\b", " ", inner, flags=re.IGNORECASE)
        name = re.sub(r"[^0-9a-zA-Z\s]", " ", name)
        name = unicodedata.normalize("NFKD", name)
        name = "".join(ch for ch in name if not unicodedata.combining(ch))
        name = re.sub(r"\s+", " ", name).strip().lower()
        if name:
            res.extend([t for t in name.split() if t])
    return res


def _normalize_text_basic(s: Optional[str]) -> str:
    if not s:
        return ""
    s = unicodedata.normalize("NFKD", s)
    s = "".join(ch for ch in s if not unicodedata.combining(ch))
    s = re.sub(r"[()]+", " ", s)
    s = re.sub(r"\b(feat\.?|ft\.?)\b", " ", s, flags=re.IGNORECASE)
    s = re.sub(r"[^0-9a-zA-Z\s]", " ", s)
    s = re.sub(r"\s+", " ", s).strip().lower()
    return s


def _normalize_artist_for_search(s: Optional[str]) -> str:
    if not s:
        return ""
    s2 = s.replace(",", " ").replace("\\", " ").replace("/", " ")
    s2 = _strip_parentheses_with_feat(s2)
    return _normalize_text_basic(s2)


def _normalize_title_for_search(s: Optional[str]) -> str:
    if not s:
        return ""
    s2 = _strip_parentheses_with_feat(s)
    return _normalize_text_basic(s2)


def _tokens(n: str) -> List[str]:
    if not n:
        return []
    return [t for t in n.split() if t]


def _tokens_in_candidate(tokens: List[str], candidate_norm: str) -> bool:
    if not tokens:
        return True
    cand_set = set(candidate_norm.split())
    return all(tok in cand_set for tok in tokens)


def _build_sanitized_query(n_artist: str, n_title: str, n_album: str, fielded: bool = True) -> str:
    def quote_and_escape(s: str) -> str:
        s2 = s.replace('"', ' ')
        s2 = re.sub(r'\s+', ' ', s2).strip()
        return f'"{s2}"' if s2 else ''
    if fielded and (n_artist or n_title or n_album):
        parts = []
        if n_title:
            parts.append(f'track:{quote_and_escape(n_title)}')
        if n_artist:
            parts.append(f'artist:{quote_and_escape(n_artist)}')
        if n_album:
            parts.append(f'album:{quote_and_escape(n_album)}')
        return " ".join([p for p in parts if p])
    parts = []
    if n_artist:
        parts.append(n_artist)
    if n_title:
        parts.append(n_title)
    if n_album:
        parts.append(n_album)
    return " ".join(parts) if parts else '""'

Search section

In [None]:
# -------------------------
# SPOTIFY CLIENT / SEARCH SECTION
# -------------------------
def get_spotify_token(client_id: str, client_secret: str, ttl_margin: int = 5) -> Tuple[str, int]:
    auth = base64.b64encode(f"{client_id}:{client_secret}".encode("utf-8")).decode("ascii")
    headers = {"Authorization": f"Basic {auth}"}
    data = {"grant_type": "client_credentials"}
    resp = requests.post(SPOTIFY_TOKEN_URL, headers=headers, data=data, timeout=REQUEST_TIMEOUT)
    resp.raise_for_status()
    j = resp.json()
    token = j["access_token"]
    expires_in = int(j.get("expires_in", 3600))
    expires_at = int(time.time()) + expires_in - ttl_margin
    return token, expires_at


def spotifysearch(token: str, q: str, type_: str = "track", limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Optional[dict]:
    headers = {"Authorization": f"Bearer {token}"}
    params = {"q": q, "type": type_, "limit": limit, "offset": offset}
    if market:
        params["market"] = market
    try:
        r = requests.get(SPOTIFY_SEARCH_URL, headers=headers, params=params, timeout=REQUEST_TIMEOUT)
    except Exception:
        return None
    if r.status_code == 401 or not r.ok:
        return None
    try:
        return r.json()
    except Exception:
        return None


def spotify_get_artist_albums(token: str, artist_id: str, limit: int = SPOTIFY_MAX_LIMIT, offset: int = 0, market: Optional[str] = None) -> Optional[dict]:
    headers = {"Authorization": f"Bearer {token}"}
    params = {"limit": limit, "offset": offset}
    if market:
        params["market"] = market
    try:
        r = requests.get(SPOTIFY_ARTIST_ALBUMS_URL.format(artist_id), headers=headers, params=params, timeout=REQUEST_TIMEOUT)
    except Exception:
        return None
    if not r.ok:
        return None
    try:
        return r.json()
    except Exception:
        return None


def spotify_get_album_tracks(token: str, album_id: str, limit: int = SPOTIFY_MAX_LIMIT, offset: int = 0, market: Optional[str] = None) -> Optional[dict]:
    headers = {"Authorization": f"Bearer {token}"}
    params = {"limit": limit, "offset": offset}
    if market:
        params["market"] = market
    try:
        r = requests.get(SPOTIFY_ALBUM_TRACKS_URL.format(album_id), headers=headers, params=params, timeout=REQUEST_TIMEOUT)
    except Exception:
        return None
    if not r.ok:
        return None
    try:
        return r.json()
    except Exception:
        return None


def spotify_find_best_match(token: str, artist: Optional[str], album: Optional[str], title: Optional[str],
                            combined_limit: int = None) -> Optional[dict]:
    if combined_limit is None:
        combined_limit = SEARCH_CANDIDATE_LIMIT

    n_artist = _normalize_artist_for_search(artist) if artist else ""
    n_title = _normalize_title_for_search(title) if title else ""
    n_album = _normalize_title_for_search(album) if album else ""

    artist_tokens = _tokens(n_artist)
    title_tokens = _tokens(n_title)
    album_tokens = _tokens(n_album)
    remixer_tokens = _extract_remixer_tokens_from_title(title or "")

    if PRINT_SEARCH_INFO:
        logging.info("Sanitized search input: artist='%s' | title='%s' | album='%s'", n_artist, n_title, n_album)

    # Build queries: fielded first, then plain
    queries: List[Tuple[str, str]] = []
    primary_q_fielded = _build_sanitized_query(n_artist, n_title, n_album, fielded=True)
    if primary_q_fielded:
        queries.append(("track", primary_q_fielded))
    at_q_fielded = _build_sanitized_query(n_artist, n_title, "", fielded=True)
    if at_q_fielded and at_q_fielded != primary_q_fielded:
        queries.append(("track", at_q_fielded))
    aa_q_fielded = _build_sanitized_query(n_artist, "", n_album, fielded=True)
    if aa_q_fielded and aa_q_fielded not in (primary_q_fielded, at_q_fielded):
        queries.append(("album", aa_q_fielded))
    t_q_fielded = _build_sanitized_query("", n_title, "", fielded=True)
    if t_q_fielded and t_q_fielded not in (primary_q_fielded, at_q_fielded, aa_q_fielded):
        queries.append(("track", t_q_fielded))
    a_q_fielded = _build_sanitized_query("", "", n_album, fielded=True)
    if a_q_fielded and a_q_fielded not in (primary_q_fielded, at_q_fielded, aa_q_fielded, t_q_fielded):
        queries.append(("album", a_q_fielded))

    primary_q_plain = _build_sanitized_query(n_artist, n_title, n_album, fielded=False)
    if primary_q_plain and primary_q_plain not in (q for _, q in queries):
        queries.append(("track", primary_q_plain))

    seen_keys = set()
    overall_idx = 0

    for (kind, q) in queries:
        if PRINT_SEARCH_INFO:
            logging.info("Query base: '%s' | type=%s | target=%d", q, kind, combined_limit)
        offset = 0
        while True:
            per_request = min(SPOTIFY_MAX_LIMIT, combined_limit - overall_idx)
            if per_request <= 0:
                break
            if PRINT_SEARCH_INFO:
                logging.info("Searching Spotify: q='%s' type=%s limit=%d offset=%d market=%s", q, kind, per_request, offset, MARKET)
            j = spotifysearch(token, q, type_=kind, limit=per_request, offset=offset, market=MARKET)
            if not j:
                break
            items = j.get((kind + "s") if kind in ("album", "track") else "tracks", {}).get("items", [])
            if not isinstance(items, list) or not items:
                break
            for it in items:
                it_id = it.get("id")
                if it_id:
                    key = f"id:{it_id}"
                else:
                    cand_title = _normalize_text_basic(it.get("name"))
                    cand_artists = " ".join(a.get("name", "") for a in it.get("artists", []))
                    cand_artist_norm = _normalize_artist_for_search(cand_artists)
                    album_info = (it.get("album") or {}) if kind == "track" else it
                    cand_album_name = _normalize_title_for_search((album_info.get("name") or ""))
                    key = f"key:{cand_title}|{cand_artist_norm}|{cand_album_name}"
                if key in seen_keys:
                    continue
                overall_idx += 1
                seen_keys.add(key)
                if kind == "track":
                    cand_title = _normalize_text_basic(it.get("name"))
                    cand_artists = " ".join(a.get("name", "") for a in it.get("artists", []))
                    cand_artist_norm = _normalize_artist_for_search(cand_artists)
                    album_info = it.get("album", {}) or {}
                    cand_album_name = _normalize_title_for_search(album_info.get("name"))
                else:
                    cand_title = _normalize_text_basic(it.get("name"))
                    cand_artist_norm = _normalize_text_basic(" ".join(a.get("name", "") for a in it.get("artists", [])))
                    cand_album_name = cand_title
                if PRINT_SEARCH_INFO:
                    logging.info("Candidate #%d: title='%s' | artist='%s' | album='%s'", overall_idx, cand_title, cand_artist_norm, cand_album_name)
                title_ok = _tokens_in_candidate(title_tokens, cand_title)
                artist_ok = (not artist_tokens) or _tokens_in_candidate(artist_tokens, cand_artist_norm) or (remixer_tokens and _tokens_in_candidate(remixer_tokens, cand_artist_norm))
                album_ok = True
                if album_tokens:
                    album_ok = _tokens_in_candidate(album_tokens, cand_album_name)
                accepted = bool(title_ok and artist_ok and album_ok)
                if PRINT_SEARCH_INFO:
                    logging.info("ACCEPTED" if accepted else "REJECTED")
                if accepted:
                    return it
                if overall_idx >= combined_limit:
                    break
            if overall_idx >= combined_limit:
                break
            offset += per_request
            if len(items) < per_request:
                break
        if overall_idx >= combined_limit:
            break

    # Fallback: artist->albums->tracks exploration
    if n_artist:
        artist_search_q = f'artist:"{n_artist}"'
        if PRINT_SEARCH_INFO:
            logging.info("Fallback artist search: %s", artist_search_q)
        artist_resp = spotifysearch(token, artist_search_q, type_="artist", limit=1, offset=0, market=MARKET)
        artist_items = []
        try:
            artist_items = artist_resp.get("artists", {}).get("items", []) if artist_resp else []
        except Exception:
            artist_items = []
        if artist_items:
            artist_id = artist_items[0].get("id")
            if PRINT_SEARCH_INFO:
                logging.info("Found artist id=%s; enumerating albums", artist_id)
            if artist_id:
                a_off = 0
                while True:
                    a_resp = spotify_get_artist_albums(token, artist_id, limit=SPOTIFY_MAX_LIMIT, offset=a_off, market=MARKET)
                    if not a_resp:
                        break
                    albums = a_resp.get("items", []) or []
                    if not albums:
                        break
                    for alb in albums:
                        alb_id = alb.get("id")
                        if not alb_id:
                            continue
                        t_off = 0
                        while True:
                            t_resp = spotify_get_album_tracks(token, alb_id, limit=SPOTIFY_MAX_LIMIT, offset=t_off, market=MARKET)
                            if not t_resp:
                                break
                            tracks = t_resp.get("items", []) or []
                            if not tracks:
                                break
                            for tr in tracks:
                                tr_id = tr.get("id")
                                if tr_id and f"id:{tr_id}" in seen_keys:
                                    continue
                                it_like = {"id": tr.get("id"), "name": tr.get("name"), "artists": tr.get("artists", []), "album": {"name": alb.get("name")}}
                                cand_title = _normalize_text_basic(it_like.get("name"))
                                cand_artists = " ".join(a.get("name", "") for a in it_like.get("artists", []))
                                cand_artist_norm = _normalize_artist_for_search(cand_artists)
                                cand_album_name = _normalize_title_for_search(alb.get("name"))
                                title_ok = _tokens_in_candidate(title_tokens, cand_title)
                                artist_ok = (not artist_tokens) or _tokens_in_candidate(artist_tokens, cand_artist_norm) or (remixer_tokens and _tokens_in_candidate(remixer_tokens, cand_artist_norm))
                                album_ok = True
                                if album_tokens:
                                    album_ok = _tokens_in_candidate(album_tokens, cand_album_name)
                                if title_ok and artist_ok and album_ok:
                                    return it_like
                                seen_keys.add(f"id:{tr_id}" if tr_id else f"key:{cand_title}|{cand_artist_norm}|{cand_album_name}")
                            if len(tracks) < SPOTIFY_MAX_LIMIT:
                                break
                            t_off += SPOTIFY_MAX_LIMIT
                    if len(albums) < SPOTIFY_MAX_LIMIT:
                        break
                    a_off += SPOTIFY_MAX_LIMIT
    return None

Tag writting

In [None]:
# -------------------------
# TAG WRITING / FORMAT-SPECIFIC HANDLERS
# -------------------------
def download_image_bytes(url: str) -> Optional[Tuple[bytes, str]]:
    try:
        r = requests.get(url, timeout=REQUEST_TIMEOUT)
        r.raise_for_status()
        mime = r.headers.get("Content-Type", "") or "image/jpeg"
        return r.content, mime
    except Exception:
        return None


def get_artist_genres(token: str, artist_id: str) -> List[str]:
    try:
        headers = {"Authorization": f"Bearer {token}"}
        r = requests.get(SPOTIFY_ARTIST_URL.format(artist_id), headers=headers, timeout=REQUEST_TIMEOUT)
        if r.ok:
            j = r.json()
            genres = j.get("genres", [])
            if isinstance(genres, list):
                return genres
    except Exception:
        pass
    return []


def remove_existing_pictures_generic(path: Path, audio_obj) -> None:
    ext = path.suffix.lower()
    try:
        if isinstance(audio_obj, ID3):
            try:
                audio_obj.delall("APIC")
            except Exception:
                pass
            return
        if ext == ".flac":
            if hasattr(audio_obj, "clear_pictures"):
                try:
                    audio_obj.clear_pictures()
                    return
                except Exception:
                    pass
            if hasattr(audio_obj, "pictures"):
                try:
                    audio_obj.pictures[:] = []
                    return
                except Exception:
                    pass
    except Exception:
        pass


def set_genre_on_audio(path: Path, audio_tmp, genres_list: List[str]) -> None:
    ext = path.suffix.lower()
    genre_value = "; ".join(genres_list) if genres_list else None
    try:
        if isinstance(audio_tmp, ID3):
            if genre_value:
                try:
                    audio_tmp.delall("TCON")
                except Exception:
                    pass
                audio_tmp.add(TCON(encoding=3, text=genre_value))
            else:
                try:
                    audio_tmp.delall("TCON")
                except Exception:
                    pass
            return
        if ext == ".flac":
            if audio_tmp.tags is None:
                audio_tmp.tags = {}
            if genre_value:
                audio_tmp.tags["genre"] = [genre_value]
            else:
                for k in ("genre", "genres"):
                    if k in audio_tmp.tags:
                        del audio_tmp.tags[k]
            return
        if ext == ".wav":
            if getattr(audio_tmp, "tags", None) is None:
                audio_tmp.tags = {}
            keys_to_try = ["IGNR", "IGEN", "GENR", "GENRE"]
            if genre_value:
                for k in keys_to_try:
                    try:
                        audio_tmp.tags[k] = [genre_value]
                        break
                    except Exception:
                        continue
            else:
                for k in keys_to_try:
                    try:
                        if k in audio_tmp.tags:
                            del audio_tmp.tags[k]
                    except Exception:
                        pass
            return
        if hasattr(audio_tmp, "tags"):
            if audio_tmp.tags is None:
                audio_tmp.tags = {}
            if genre_value:
                audio_tmp.tags["genre"] = [genre_value]
            else:
                try:
                    if "genre" in audio_tmp.tags:
                        del audio_tmp.tags["genre"]
                except Exception:
                    pass
    except Exception:
        pass


# Helpers to build RIFF LIST/INFO and ID3 bytes
def _encode_text_for_info(s: str) -> bytes:
    b = s.encode("utf-8")
    if len(b) % 2 == 1:
        b += b'\x00'
    return b


def build_info_list_chunk(metadata: dict) -> bytes:
    subchunks = b""
    mapping = [
        ("INAM", "title"),
        ("IART", "artist"),
        ("IPRD", "album"),
        ("ICRD", "date"),
        ("ITRK", "track"),
        ("TPOS", "disc"),
        ("IGNR", "genre"),
    ]
    for cid, key in mapping:
        v = metadata.get(key)
        if v:
            data = _encode_text_for_info(str(v))
            subchunks += cid.encode('ascii') + struct.pack('<I', len(data)) + data
    if not subchunks:
        return b""
    size = 4 + len(subchunks)  # "INFO" + subchunks
    chunk = b"LIST" + struct.pack('<I', size) + b"INFO" + subchunks
    return chunk


def build_id3_bytes_for_wav(image_bytes: Optional[bytes], mime: Optional[str], metadata: dict) -> bytes:
    """
    Build an ID3v2.3 tag in memory including APIC and textual frames:
    TIT2, TPE1, TALB, TDRC, TRCK, TPOS, TCON, TSRC (if provided).
    Use encoding=1 (UTF-16) for textual frames for better WAV+Mp3tag compatibility.
    """
    id3 = ID3()
    try:
        if image_bytes:
            id3.add(APIC(encoding=3, mime=mime or "image/jpeg", type=3, desc="Cover", data=image_bytes))
        # textual frames in UTF-16
        if metadata.get("title"):
            id3.add(TIT2(encoding=1, text=str(metadata["title"])))
        if metadata.get("artist"):
            id3.add(TPE1(encoding=1, text=str(metadata["artist"])))
        if metadata.get("album"):
            id3.add(TALB(encoding=1, text=str(metadata["album"])))
        if metadata.get("date"):
            id3.add(TDRC(encoding=1, text=str(metadata["date"])))
        if metadata.get("track"):
            id3.add(TRCK(encoding=1, text=str(metadata["track"])))
        if metadata.get("disc"):
            id3.add(TPOS(encoding=1, text=str(metadata["disc"])))
        if metadata.get("genre"):
            id3.add(TCON(encoding=1, text=str(metadata["genre"])))
        if metadata.get("isrc"):
            try:
                id3.add(TSRC(encoding=1, text=str(metadata["isrc"])))
            except Exception:
                pass
        bio = io.BytesIO()
        id3.save(bio, v2_version=3)
        b = bio.getvalue()
        if len(b) % 2 == 1:
            b += b'\x00'
        return b
    except Exception:
        return b""


# RIFF helpers
def find_first_riff_offset(b: bytes) -> int:
    return b.find(b"RIFF")


def parse_riff_chunks_and_find_data_offset(b: bytes, start_offset: int = 0):
    if len(b) < start_offset + 12:
        return -1, None, start_offset + 4
    if b[start_offset:start_offset+4] != b"RIFF":
        return -1, None, start_offset + 4
    off = start_offset + 12
    end = len(b)
    while off + 8 <= end:
        cid = b[off:off+4]
        sz = struct.unpack_from('<I', b, off+4)[0]
        if cid == b"data":
            return off, sz, start_offset + 4
        advance = 8 + sz + (sz % 2)
        off += advance
    return -1, None, start_offset + 4


def insert_chunk_before_data(original_bytes: bytes, chunk_id: bytes, chunk_data: bytes) -> bytes:
    riff_off = find_first_riff_offset(original_bytes)
    if riff_off == -1:
        raise RuntimeError("RIFF header not found in file")
    data_off, data_sz, riff_size_field = parse_riff_chunks_and_find_data_offset(original_bytes, riff_off)
    add_len = 8 + len(chunk_data)
    orig_riff_size = struct.unpack_from('<I', original_bytes, riff_off+4)[0]
    new_riff_size = orig_riff_size + add_len
    new_bytes = bytearray(original_bytes)
    struct.pack_into('<I', new_bytes, riff_off+4, new_riff_size)
    chunk = bytearray()
    chunk += chunk_id
    chunk += struct.pack('<I', len(chunk_data))
    chunk += chunk_data
    if data_off == -1:
        new_bytes.extend(chunk)
        return bytes(new_bytes)
    else:
        new = new_bytes[:data_off] + chunk + new_bytes[data_off:]
        return bytes(new)


def strip_id3_and_list_info(orig_bytes: bytes) -> bytes:
    """
    Remove any existing 'id3 ' chunks and 'LIST' chunks whose subtype is 'INFO'
    from a RIFF/WAVE byte buffer. Rebuilds RIFF size field accordingly.
    Returns bytes (unchanged if nothing to remove).
    """
    riff_off = find_first_riff_offset(orig_bytes)
    if riff_off == -1:
        return orig_bytes
    if len(orig_bytes) < riff_off + 12:
        return orig_bytes

    off = riff_off + 12
    end = len(orig_bytes)
    kept_chunks = bytearray()
    while off + 8 <= end:
        cid = orig_bytes[off:off+4]
        sz = struct.unpack_from('<I', orig_bytes, off+4)[0]
        data_start = off + 8
        data_end = data_start + sz
        if data_end > end:
            # malformed - keep rest and break
            kept_chunks += orig_bytes[off:end]
            break
        skip = False
        if cid == b"id3 ":
            skip = True
        elif cid == b"LIST":
            # check subtype (first 4 bytes inside LIST data)
            if orig_bytes[data_start:data_start+4] == b"INFO":
                skip = True
        if not skip:
            # include chunk + padding byte if present
            chunk_end = data_end + (sz % 2)
            kept_chunks += orig_bytes[off:chunk_end]
        off = data_end + (sz % 2)

    new_riff_size = 4 + len(kept_chunks)  # 'WAVE' (4) + kept chunks
    new_buf = bytearray()
    new_buf += b"RIFF"
    new_buf += struct.pack('<I', new_riff_size)
    new_buf += orig_bytes[riff_off+8:riff_off+12]  # 'WAVE' (4 bytes) - keep original WAVE id
    new_buf += kept_chunks
    return bytes(new_buf)

CORE (update metadata for a single file)

In [None]:
# -------------------------
# CORE: update metadata for a single file (handles FLAC/MP3/WAV)
# -------------------------
def overwrite_metadata_with_spotify(file_path: Path, token: str) -> bool:
    """
    Core update function:
    - Read tags (FLAC/ID3/WAVE)
    - Determine artist/title/album (and fallback to filename)
    - Search Spotify (ISRC first then containment)
    - Create temp copy, write tags:
        * FLAC: standard mutagen tags + pictures (remove previous pictures first)
        * MP3: ID3 frames + APIC (remove previous APIC first)
        * WAV: create clean WAV candidate, write LIST/INFO and insert 'id3 ' chunk with ID3v2.3(APIC + textual frames UTF-16)
               (strip pre-existing id3/LIST INFO chunks to avoid duplicates)
    - Replace original with modified temp (original -> trash)
    """
    ext = file_path.suffix.lower()
    wav_has_id3 = False
    audio = None
    try:
        if ext == ".flac":
            audio = FLAC(str(file_path))
        elif ext == ".mp3":
            try:
                audio = ID3(str(file_path))
            except ID3NoHeaderError:
                audio = ID3()
        elif ext == ".wav":
            # Prefer ID3 chunk inside WAV if present (mutagen will detect)
            try:
                audio = ID3(str(file_path))
                wav_has_id3 = True
            except ID3NoHeaderError:
                try:
                    audio = WAVE(str(file_path))
                    wav_has_id3 = False
                except Exception:
                    audio = None
                    wav_has_id3 = False
        else:
            logging.info("Unsupported format: %s", file_path.name)
            return False
    except Exception as e:
        logging.error("Could not open %s: %s", file_path.name, e)
        return False

    # collect tags generically
    tags = None
    try:
        if isinstance(audio, FLAC):
            tags = audio.tags or {}
        elif isinstance(audio, ID3):
            tags = audio
        elif isinstance(audio, WAVE):
            tags = getattr(audio, "tags", {}) or {}
        else:
            tags = {}
    except Exception:
        tags = {}

    def _val_to_str(v):
        try:
            if v is None:
                return None
            if isinstance(v, (list, tuple)) and v:
                v0 = v[0]
                if hasattr(v0, "text"):
                    txt = getattr(v0, "text")
                    if isinstance(txt, (list, tuple)):
                        return str(txt[0])
                    return str(txt)
                return str(v0)
            if hasattr(v, "text"):
                txt = getattr(v, "text")
                if isinstance(txt, (list, tuple)):
                    return str(txt[0])
                return str(txt)
            return str(v)
        except Exception:
            try:
                return str(v)
            except Exception:
                return None

    def first_tag_generic(k):
        try:
            if isinstance(audio, FLAC):
                v = tags.get(k)
                if v:
                    if isinstance(v, (list, tuple)):
                        return str(v[0])
                    return str(v)
                return None
            if isinstance(audio, ID3):
                map_frames = {"artist":"TPE1","albumartist":"TPE2","album":"TALB","title":"TIT2","date":"TDRC","tracknumber":"TRCK","discnumber":"TPOS","isrc":"TSRC","genre":"TCON"}
                frame = map_frames.get(k)
                if frame and frame in tags:
                    f = tags.getall(frame)
                    if f:
                        try:
                            txt = f[0].text
                            if isinstance(txt,(list,tuple)):
                                return str(txt[0])
                            return str(txt)
                        except Exception:
                            try:
                                return str(f[0])
                            except Exception:
                                return None
                return None
            # WAVE / RIFF INFO fallback (and ID3-like frames stored in WAVE.tags)
            if getattr(tags, "get", None):
                v = tags.get(k)
                if v:
                    return _val_to_str(v)
                alt_keys = {
                    "title":["INAM","NAME","TITLE","TIT2"],
                    "artist":["IART","AUTH","ARTIST","TPE1"],
                    "album":["IPRD","ALBUM","TALB"],
                    "date":["ICRD","DATE","YEAR","TDRC"],
                    "tracknumber":["ITRK","TRACKNUMBER","TRCK"],
                    "discnumber":["TPOS","DISCNUMBER"],
                    "isrc":["TSRC","ISRC"],
                    "genre":["IGEN","IGNR","GENR","GENRE","TCON"]
                }
                for alt in alt_keys.get(k, []):
                    try:
                        vv = tags.get(alt)
                        if vv:
                            return _val_to_str(vv)
                    except Exception:
                        continue
            return None
        except Exception:
            return None

    artist = first_tag_generic("artist") or first_tag_generic("albumartist")
    album = first_tag_generic("album")
    title = first_tag_generic("title")
    isrc_tag = first_tag_generic("isrc") or first_tag_generic("ISRC")

    # filename fallback
    ai, ti = infer_artist_title_from_filename(file_path)
    artist = artist or ai
    title = title or ti

    if not artist and not title:
        logging.info("Insufficient metadata for: %s", file_path.name)
        return False

    if PRINT_SEARCH_INFO:
        if artist and title:
            logging.info("Searching with artist+title (both available).")
        elif title and not artist:
            logging.info("Searching with title only.")
        elif artist and not title:
            logging.info("Searching with artist only.")

    artist_for_search = _strip_parentheses_with_feat(artist) if artist else None
    title_for_search = _strip_parentheses_with_feat(title) if title else None
    album_for_search = _strip_parentheses_with_feat(album) if album else None

    if PRINT_SEARCH_INFO:
        sanitized_artist = _normalize_artist_for_search(artist_for_search) if artist_for_search else ""
        sanitized_title = _normalize_title_for_search(title_for_search) if title_for_search else ""
        sanitized_album = _normalize_title_for_search(album_for_search) if album_for_search else ""
        logging.info("Search input (sanitized): artist='%s' | title='%s' | album='%s' | ext=%s", sanitized_artist, sanitized_title, sanitized_album, ext)

    match = None
    # ISRC first
    if isrc_tag:
        isrc_q = f'isrc:"{isrc_tag.strip()}"'
        if PRINT_SEARCH_INFO:
            logging.info("Attempting ISRC search: %s", isrc_q)
        try:
            j = spotifysearch(token, isrc_q, type_="track", limit=1, offset=0, market=MARKET)
            if j:
                items = j.get("tracks", {}).get("items", [])
                if items:
                    match = items[0]
        except Exception:
            match = None

    # containment search fallback
    if not match:
        match = spotify_find_best_match(token, artist_for_search, album_for_search, title_for_search, combined_limit=SEARCH_CANDIDATE_LIMIT)

    if not match:
        logging.info("No Spotify match for: %s", file_path.name)
        return False

    # Extract fields from Spotify match
    meta_artist = None
    meta_title = None
    meta_album = None
    meta_date = None
    meta_track = None
    meta_disc = None
    image_url = None
    artist_id = None
    genres_list: List[str] = []

    if "album" in match and "name" in match:
        meta_title = match.get("name")
        album_info = match.get("album", {})
        meta_album = album_info.get("name")
        artists = match.get("artists", [])
        if artists:
            meta_artist = artists[0].get("name")
            artist_id = artists[0].get("id")
        meta_track = str(match.get("track_number")) if match.get("track_number") else None
        meta_disc = str(match.get("disc_number")) if match.get("disc_number") else None
        images = album_info.get("images", [])
        if images:
            image_url = images[0].get("url")
        meta_date = album_info.get("release_date")
        if isinstance(album_info.get("genres"), list) and album_info.get("genres"):
            genres_list = album_info.get("genres", [])
    else:
        meta_album = match.get("name")
        artists = match.get("artists", [])
        if artists:
            meta_artist = artists[0].get("name")
            artist_id = artists[0].get("id")
        images = match.get("images", [])
        if images:
            image_url = images[0].get("url")
        meta_date = match.get("release_date")
        if isinstance(match.get("genres"), list) and match.get("genres"):
            genres_list = match.get("genres", [])

    if PRINT_SEARCH_INFO:
        candidate_album = None
        if "album" in match:
            candidate_album = (match.get("album") or {}).get("name")
        else:
            candidate_album = match.get("name")
        logging.info("Spotify candidate: spotify_title='%s' | spotify_artist='%s' | spotify_album='%s'", meta_title, meta_artist, candidate_album)

    # optionally fetch artist genres
    if artist_id:
        try:
            g = get_artist_genres(token, artist_id)
            if g:
                genres_list = g
        except Exception:
            pass

    if not genres_list and meta_artist:
        try:
            j = spotifysearch(token, f'artist:"{meta_artist}"', type_="artist", limit=1, offset=0, market=MARKET)
            if j:
                items = j.get("artists", {}).get("items", [])
                if items:
                    gg = items[0].get("genres", [])
                    if isinstance(gg, list) and gg:
                        genres_list = gg
        except Exception:
            pass

    # metadata map
    metadata_map = {
        "title": meta_title or title or "",
        "artist": meta_artist or artist or "",
        "album": meta_album or album or "",
        "date": meta_date or "",
        "track": meta_track or "",
        "disc": meta_disc or "",
        "genre": "; ".join(genres_list) if genres_list else "",
        "isrc": isrc_tag or ""
    }

    temp_path = None
    try:
        temp_path = unique_temp_copy(file_path)

        # FLAC
        if ext == ".flac":
            audio_tmp = FLAC(str(temp_path))
            if audio_tmp.tags is None:
                audio_tmp.tags = {}
            audio_tmp.tags["title"] = [metadata_map["title"]]
            audio_tmp.tags["artist"] = [metadata_map["artist"]]
            if metadata_map["album"]:
                audio_tmp.tags["album"] = [metadata_map["album"]]
            if metadata_map["date"]:
                audio_tmp.tags["date"] = [metadata_map["date"]]
            if metadata_map["track"]:
                audio_tmp.tags["tracknumber"] = [metadata_map["track"]]
            if metadata_map["disc"]:
                audio_tmp.tags["discnumber"] = [metadata_map["disc"]]
            if metadata_map["genre"]:
                audio_tmp.tags["genre"] = [metadata_map["genre"]]
            if image_url:
                got = download_image_bytes(image_url)
                if got:
                    image_bytes, mime = got
                    pic = Picture()
                    pic.data = image_bytes
                    pic.type = 3
                    pic.mime = mime
                    try:
                        # remove existing pictures to avoid duplication
                        remove_existing_pictures_generic(file_path, audio_tmp)
                    except Exception:
                        pass
                    try:
                        audio_tmp.add_picture(pic)
                    except Exception:
                        pass
            try:
                audio_tmp.save()
            except Exception:
                try:
                    audio_tmp.save(str(temp_path))
                except Exception:
                    pass

        # MP3
        elif ext == ".mp3":
            try:
                audio_tmp = ID3(str(temp_path))
            except ID3NoHeaderError:
                audio_tmp = ID3()
            audio_tmp.delall("TIT2"); audio_tmp.add(TIT2(encoding=3, text=metadata_map["title"]))
            audio_tmp.delall("TPE1"); audio_tmp.add(TPE1(encoding=3, text=metadata_map["artist"]))
            if metadata_map["album"]:
                audio_tmp.delall("TALB"); audio_tmp.add(TALB(encoding=3, text=metadata_map["album"]))
            if metadata_map["date"]:
                audio_tmp.delall("TDRC"); audio_tmp.add(TDRC(encoding=3, text=metadata_map["date"]))
            if metadata_map["track"]:
                audio_tmp.delall("TRCK"); audio_tmp.add(TRCK(encoding=3, text=metadata_map["track"]))
            if metadata_map["disc"]:
                audio_tmp.delall("TPOS"); audio_tmp.add(TPOS(encoding=3, text=metadata_map["disc"]))
            if metadata_map["genre"]:
                audio_tmp.delall("TCON"); audio_tmp.add(TCON(encoding=3, text=metadata_map["genre"]))
            if metadata_map["isrc"]:
                try:
                    audio_tmp.delall("TSRC"); audio_tmp.add(TSRC(encoding=3, text=metadata_map["isrc"]))
                except Exception:
                    pass
            if image_url:
                got = download_image_bytes(image_url)
                if got:
                    image_bytes, mime = got
                    try:
                        # remove existing APICs/pictures first
                        remove_existing_pictures_generic(file_path, audio_tmp)
                    except Exception:
                        pass
                    try:
                        audio_tmp.add(APIC(encoding=3, mime=mime, type=3, desc="Cover", data=image_bytes))
                    except Exception:
                        pass
            try:
                audio_tmp.save(str(temp_path))
            except Exception:
                try:
                    audio_tmp.save()
                except Exception:
                    pass

        # WAV
        elif ext == ".wav":
            # Read original temp copy bytes
            orig_bytes = Path(temp_path).read_bytes()

            # detect ID3v2 header at very start (syncsafe size)
            def parse_id3v2_header_size(head_bytes: bytes) -> int:
                if len(head_bytes) < 10 or head_bytes[:3] != b"ID3":
                    return 0
                sz_bytes = head_bytes[6:10]
                size = (sz_bytes[0] & 0x7F) << 21 | (sz_bytes[1] & 0x7F) << 14 | (sz_bytes[2] & 0x7F) << 7 | (sz_bytes[3] & 0x7F)
                flags = head_bytes[5]
                footer = 10 if (flags & 0x10) else 0
                return 10 + size + footer

            head = orig_bytes[:65536]
            id3_head_size = parse_id3v2_header_size(head)
            if id3_head_size > 0:
                logging.info("WAV: detected ID3v2 header at start (size=%d). Skipping it for wave parsing.", id3_head_size)
                candidate_bytes = orig_bytes[id3_head_size:]
            else:
                riff_idx = head.find(b"RIFF")
                if riff_idx > 0:
                    logging.info("WAV: RIFF found at offset %d in header; using slice.", riff_idx)
                    candidate_bytes = orig_bytes[riff_idx:]
                else:
                    logging.info("WAV: no ID3 at start and no RIFF in first 64KB; using entire file as candidate.")
                    candidate_bytes = orig_bytes

            # validate candidate contains RIFF
            if candidate_bytes.find(b"RIFF") == -1:
                logging.error("WAV candidate does not contain RIFF; aborting WAV write.")
                try:
                    Path(temp_path).unlink()
                except Exception:
                    pass
                return False

            # Write candidate to tmp_in and re-build a clean WAV to ensure proper chunk layout
            with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_in:
                tmp_in_name = tmp_in.name
                tmp_in.write(candidate_bytes)
            tmp_out_name = None
            try:
                with wave.open(tmp_in_name, 'rb') as r:
                    params = r.getparams()
                    frames = r.readframes(r.getnframes())
                with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_out:
                    tmp_out_name = tmp_out.name
                with wave.open(tmp_out_name, 'wb') as w:
                    w.setparams(params)
                    w.writeframes(frames)
            except Exception as e:
                logging.error("WAV rebuild failed: %s", e)
                try:
                    Path(tmp_in_name).unlink(missing_ok=True)
                except Exception:
                    pass
                return False
            finally:
                try:
                    Path(tmp_in_name).unlink(missing_ok=True)
                except Exception:
                    pass

            # Prepare LIST/INFO chunk and ID3 bytes
            list_chunk = build_info_list_chunk(metadata_map)
            image_bytes = None
            image_mime = None
            if image_url:
                got = download_image_bytes(image_url)
                if got:
                    image_bytes, image_mime = got
            id3_bytes = build_id3_bytes_for_wav(image_bytes, image_mime, metadata_map)

            # Read the clean WAV we wrote
            clean_bytes = Path(tmp_out_name).read_bytes()

            # Strip any pre-existing id3 / LIST INFO chunks to avoid duplicates
            try:
                clean_bytes = strip_id3_and_list_info(clean_bytes)
            except Exception as e:
                logging.info("WAV: failed to strip existing id3/LIST chunks (non-fatal): %s", e)

            if list_chunk:
                try:
                    # list_chunk already contains its 'LIST' header + size; we pass only the "INFO"+subchunks as chunk_data
                    clean_bytes = insert_chunk_before_data(clean_bytes, b"LIST", list_chunk[8:])  # pass only 'INFO'+subchunks as data
                    logging.info("WAV: LIST/INFO chunk inserted.")
                except Exception as e:
                    logging.info("WAV: failed to insert LIST chunk: %s", e)
            if id3_bytes:
                try:
                    clean_bytes = insert_chunk_before_data(clean_bytes, b"id3 ", id3_bytes)
                    logging.info("WAV: 'id3 ' chunk (ID3v2.3 with APIC + textual frames) inserted.")
                except Exception as e:
                    logging.info("WAV: failed to insert id3 chunk: %s", e)

            # Overwrite temp_path with new bytes
            try:
                Path(temp_path).write_bytes(clean_bytes)
            except Exception as e:
                logging.error("Failed writing final WAV bytes to temp file: %s", e)
                try:
                    Path(tmp_out_name).unlink(missing_ok=True)
                except Exception:
                    pass
                return False

            try:
                Path(tmp_out_name).unlink(missing_ok=True)
            except Exception:
                pass

        else:
            logging.info("Unsupported for write: %s", file_path.name)
            if temp_path and Path(temp_path).exists():
                try:
                    temp_path.unlink()
                except Exception:
                    pass
            return False

        # Replace original: send original to trash and move temp into place
        send_original_to_trash(file_path)
        shutil.move(str(temp_path), str(file_path))
        return True

    except Exception as e:
        if temp_path and Path(temp_path).exists():
            try:
                Path(temp_path).unlink()
            except Exception:
                pass
        logging.error("Failed updating %s: %s", file_path.name, e)
        return False

File iteration + MAIN

In [None]:
# -------------------------
# FILE ITERATION + MAIN
# -------------------------
def iter_audio_files(root: Path, recursive: bool):
    patterns = ("*.flac", "*.mp3", "*.wav")
    if recursive:
        for pat in patterns:
            yield from root.rglob(pat)
    else:
        for pat in patterns:
            yield from root.glob(pat)


def get_creation_time(path: Path) -> float:
    try:
        s = path.stat()
        if hasattr(s, "st_birthtime"):
            return float(s.st_birthtime)
        if platform.system() == "Windows":
            return float(s.st_ctime)
        return float(s.st_mtime)
    except Exception:
        return 0.0


def main():
    # Load credentials (client id/secret) and music path from credentials.json
    try:
        with CREDENTIALS_PATH.open("r", encoding="utf-8") as f:
            data = json.load(f)
        client_id = str(data.get("client_id", "")).strip()
        client_secret = str(data.get("client_secret", "")).strip()
        music_path = data.get("music_path") or data.get("source_dir") or data.get("music_dir")
        if not client_id or not client_secret:
            logging.error("Missing client_id or client_secret in credentials.json")
            return
        if not music_path:
            logging.error("Missing music_path in credentials.json")
            return
        SOURCE_DIR = Path(music_path)
        if not SOURCE_DIR.exists() or not SOURCE_DIR.is_dir():
            logging.error("music_path from credentials.json is not a valid directory: %s", SOURCE_DIR)
            return
        logging.info("Loaded music path from credentials.json: %s", SOURCE_DIR)

        token, expires_at = get_spotify_token(client_id, client_secret)
        logging.info("Spotify token obtained")
    except Exception as e:
        logging.error("Failed to load credentials or obtain token: %s", e)
        return

    paths = [p for p in iter_audio_files(SOURCE_DIR, RECURSIVE)]
    paths.sort(key=lambda p: get_creation_time(p) if p.exists() else 0, reverse=True)
    total = len(paths)
    logging.info("Found %d audio files (FLAC/MP3/WAV) in %s", total, SOURCE_DIR)

    if PROCESS_TOP_X and isinstance(PROCESS_TOP_X, int) and PROCESS_TOP_X > 0:
        limit = min(PROCESS_TOP_X, total)
        paths = paths[:limit]

    updated = skipped = failed = 0
    for i, path in enumerate(paths, 1):
        try:
            if int(time.time()) >= expires_at:
                try:
                    token, expires_at = get_spotify_token(client_id, client_secret)
                except Exception:
                    logging.error("Failed to refresh Spotify token")
                    break

            logging.info("Processing (%d/%d): %s", i, len(paths), path.name)
            ok = overwrite_metadata_with_spotify(path, token)
            if ok:
                logging.info("Updated metadata for: %s (original moved to trash)", path.name)
                updated += 1
            else:
                skipped += 1

        except KeyboardInterrupt:
            break
        except Exception as e:
            logging.error("Unexpected error processing %s: %s", path.name, e)
            failed += 1

    logging.info("Completed. Updated: %d, Skipped: %d, Failed: %d, Total found: %d", updated, skipped, failed, total)


if __name__ == "__main__":
    main()