In [None]:
# Uncoment if needed

# ! pip3 install -r requirements.txt
# ! pip3 install torch torchvision --index-url https://download.pytorch.org/whl/cu129

Imports and configuration

In [None]:
# -------------------------
# IMPORTS
# -------------------------
import os
import glob
import json
import re
import shutil
from typing import List, Tuple, Dict, Optional
import difflib
from collections import deque
import whisper
from mutagen.flac import FLAC
from lyricsgenius import Genius
from datetime import datetime

# -------------------------
# INIT & CONFIG PARAMETERS
# -------------------------
# Toggle debug verbosity. When DEBUG is True many extra detalles se imprimen a stdout.
DEBUG = True

# Directory names
SONGS_FOLDER = "songs"           # where your audio files (.flac) live
LYRICS_FOLDER = "lyrics"         # output folder where generated .lrc files are written
LYRICS_DB_FOLDER = ".lyrics_db"  # persistent database of processed lyrics (normalized: "<Title> - <Artist>.lrc")
LOGS_FOLDER = ".logs"            # folder to collect all raw transcripts, timestamps, raw lyrics and human logs

# Credentials
CREDENTIALS_PATH = "credentials.json"

# Outputs
ANCHORS_LOG_PATH = os.path.join(LOGS_FOLDER, ".anchors.txt")

# Whisper model names to compare. Transcriptions will be produced for each model and compared.
TRANSCRIBE_MODELS = ["large-v3", "large-v3-turbo"]

# How much extra proportion of time the first non-anchored line receives in head interpolation.
# If there are M lines before the first anchored line, weights = [FIRST_LINE_WEIGHT, 1, 1, ..., 1] (length M)
FIRST_LINE_WEIGHT = 3

# Silence / thresholds
MIN_SILENCE_DURATION = 1.5        # minimal gap between word end and next word start to consider a "silence"
LONG_SILENCE_THRESHOLD = 10.0     # silence >= this is considered a "long silence" and blocks interpolation across it
THRESH_ANCHOR = 0.80              # minimal similarity score for an anchor candidate
MIN_OVERLAP = 0.60                # minimal fraction of lyric words present in matched transcription window
MIN_ANCHOR_SPACING = 2.0          # minimum seconds between accepted anchors (to avoid clustering anchors too close)

# Timestamp progression / fallback
MIN_LINE_PROGRESSION = 0.25       # minimal increment to enforce strictly increasing timestamps
FALLBACK_SPACING = 2.5            # spacing used when filling the tail region without active intervals

Utilities

In [None]:
# -------------------------
# INFO
# -------------------------
def dbg(msg: str):
    """Debug printing: prints only when DEBUG is True."""
    if DEBUG:
        print("[DEBUG]", msg)

def info(msg: str):
    """
    Informational printing: prints only essential runtime status.
    The idea is to keep the console clean when DEBUG is False.
    """
    print("[INFO]", msg)

# -------------------------
# LOGS
# -------------------------
def init_anchors_log():
    """
    Create/append a run header in the anchors log.
    Call this once at the start of a run (after LOGS_FOLDER exists).
    """
    try:
        os.makedirs(LOGS_FOLDER, exist_ok=True)
        ts = datetime.now().isoformat(sep=" ", timespec="seconds")
        header = f"=== Run: {ts} ===\n"
        with open(ANCHORS_LOG_PATH, "a", encoding="utf-8") as f:
            f.write(header)
    except Exception as e:
        dbg(f"init_anchors_log error: {e}")

def append_anchor_entry(basename: str, status: str, extra: str = ""):
    """
    Append a single song-level entry to the anchors log.
    Format: <basename>\t<status>\t<extra>\n
      - basename: e.g. "038 - Dance With Me"
      - status: e.g. "large-v3", "all models failed", "lyrics retrieving failed", "restored_from_db"
      - extra: optional additional info
    """
    try:
        ts = datetime.now().isoformat(sep=" ", timespec="seconds")
        safe_basename = str(basename)
        safe_status = str(status)
        safe_extra = str(extra) if extra else ""
        line = f"{ts}\t{safe_basename}\t{safe_status}"
        if safe_extra:
            line += f"\t{safe_extra}"
        line += "\n"
        with open(ANCHORS_LOG_PATH, "a", encoding="utf-8") as f:
            f.write(line)
    except Exception as e:
        dbg(f"append_anchor_entry error: {e}")
    
# -------------------------
# DATA EXTRACTION
# -------------------------
def ensure_folders():
    """Ensure all expected folders exist."""
    os.makedirs(SONGS_FOLDER, exist_ok=True)
    os.makedirs(LYRICS_FOLDER, exist_ok=True)
    os.makedirs(LYRICS_DB_FOLDER, exist_ok=True)
    os.makedirs(LOGS_FOLDER, exist_ok=True)

def load_credentials(path: str = CREDENTIALS_PATH) -> dict:
    """Load optional credentials.json (e.g. Genius token). Returns dict or empty dict."""
    if not os.path.exists(path):
        dbg(f"credentials not found at {path}")
        return {}
    try:
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
            if isinstance(data, dict):
                dbg("credentials loaded")
                return data
    except Exception as e:
        info(f"warning: error loading credentials: {e}")
    return {}

def find_flac_files(folder: str = SONGS_FOLDER) -> List[str]:
    """Return sorted list of .flac files in songs folder."""
    return sorted(glob.glob(os.path.join(folder, "*.flac")))

def normalize_text(s: str) -> str:
    """Lowercase and remove punctuation (preserve apostrophes)."""
    s = (s or "").lower()
    s = re.sub(r"[^\w\s']", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def sanitize_filename(text: str) -> str:
    """Return a file-system-friendly filename from text."""
    text = re.sub(r'[<>:"/\\|?*]', '', text)
    text = re.sub(r'[\n\r\t]', ' ', text)
    text = re.sub(r'\s+', ' ', text).strip()
    if len(text) > 200:
        text = text[:200].strip()
    return text

def split_lyrics_lines(lyrics: str) -> List[str]:
    """Return non-empty lyric lines, skipping bracketed metadata lines like [Chorus]."""
    out = []
    for line in (lyrics or "").splitlines():
        l = line.strip()
        if not l:
            continue
        if re.match(r'^\[.*\]$', l):
            continue
        out.append(l)
    return out

def extract_metadata_from_flac(path: str) -> Tuple[Optional[str], Optional[str]]:
    """
    Try to extract (artist, title) from FLAC tags using mutagen.
    If tags are missing, fall back to filename parsing "Artist - Title.flac".
    """
    if os.path.exists(path):
        try:
            audio = FLAC(path)
            artist = audio.get("artist", [None])[0]
            title = audio.get("title", [None])[0]
            if artist and title:
                dbg(f"metadata from FLAC: artist='{artist}', title='{title}'")
                return artist, title
        except Exception:
            dbg("mutagen could not read FLAC tags (or mutagen not available)")
    base = os.path.splitext(os.path.basename(path))[0]
    if " - " in base:
        parts = base.split(" - ", 1)
        return parts[0].strip(), parts[1].strip()
    return None, base

def parse_lrc_header_tags(lrc_path: str) -> Tuple[Optional[str], Optional[str]]:
    """
    Extract [ti:Title] and [ar:Artist] tags from an existing LRC file (if present).
    Only inspects the first ~50 lines to be efficient.
    """
    artist = None
    title = None
    try:
        with open(lrc_path, "r", encoding="utf-8", errors="ignore") as f:
            for _ in range(50):
                line = f.readline()
                if not line:
                    break
                line = line.strip()
                m_ti = re.match(r'^\s*\[ti\s*:\s*(.+?)\s*\]\s*$', line, re.I)
                m_ar = re.match(r'^\s*\[ar\s*:\s*(.+?)\s*\]\s*$', line, re.I)
                if m_ti:
                    title = m_ti.group(1).strip()
                if m_ar:
                    artist = m_ar.group(1).strip()
                if artist and title:
                    break
    except Exception:
        dbg("failed to parse LRC header tags")
    return artist, title

# -------------------------
# LYRICS DB MANAGEMENT
# -------------------------
def initialize_folders():
    """Prepare output and DB folders. If lyrics/ exists, recreate clean output folder."""
    info("Initializing folders...")
    if os.path.exists(LYRICS_FOLDER):
        info(f"  Removing existing '{LYRICS_FOLDER}' directory...")
        shutil.rmtree(LYRICS_FOLDER)
    os.makedirs(LYRICS_FOLDER, exist_ok=True)
    os.makedirs(LYRICS_DB_FOLDER, exist_ok=True)
    os.makedirs(LOGS_FOLDER, exist_ok=True)
    dbg(f"created folders: {LYRICS_FOLDER}, {LYRICS_DB_FOLDER}, {LOGS_FOLDER}")

def copy_to_lyrics_db(lrc_path: str, title: str, artist: str):
    """
    Copy a generated LRC into the normalized lyrics DB as "<Title> - <Artist>.lrc".
    Overwrite existing DB entries to preserve the authoritative LRC (user requested).
    """
    try:
        normalized_name = sanitize_filename(f"{title} - {artist}")
        db_filename = f"{normalized_name}.lrc"
        db_path = os.path.join(LYRICS_DB_FOLDER, db_filename)
        # Overwrite if exists (user requested that songs/*.lrc are authoritative)
        try:
            if os.path.exists(db_path):
                os.remove(db_path)
                dbg(f"removed existing DB entry to overwrite: {db_filename}")
            shutil.copy2(lrc_path, db_path)
            dbg(f"copied to DB: {db_filename}")
        except Exception as e:
            info(f"warning: error copying to DB: {e}")
    except Exception as e:
        info(f"warning: error copying to DB: {e}")

def restore_from_lyrics_db(title: str, artist: str, target_lrc_path: str) -> bool:
    """If DB contains a matching '<Title> - <Artist>.lrc', copy it to target and return True."""
    try:
        normalized_name = sanitize_filename(f"{title} - {artist}")
        db_filename = f"{normalized_name}.lrc"
        db_path = os.path.join(LYRICS_DB_FOLDER, db_filename)
        if os.path.exists(db_path):
            shutil.copy2(db_path, target_lrc_path)
            info(f"Restored from DB: {db_filename}")
            return True
        else:
            dbg(f"not found in DB: {db_filename}")
            return False
    except Exception as e:
        info(f"warning: error restoring from DB: {e}")
        return False

def search_similar_in_db(title: str, artist: str, similarity_threshold: float = 0.8) -> Optional[str]:
    """
    Search for a similar DB entry to avoid fetching/processing when close match exists.
    This is useful when metadata slightly differs but we already have a processed LRC.
    Returns path to DB file or None.
    """
    try:
        if not os.path.exists(LYRICS_DB_FOLDER):
            return None
        db_files = glob.glob(os.path.join(LYRICS_DB_FOLDER, "*.lrc"))
        if not db_files:
            return None
        target_string = f"{title} - {artist}".lower()
        best_match = None
        best_score = 0.0
        for db_file in db_files:
            db_basename = os.path.splitext(os.path.basename(db_file))[0]
            db_string = db_basename.lower()
            similarity = compute_similarity(target_string, db_string)
            if similarity > best_score and similarity >= similarity_threshold:
                best_score = similarity
                best_match = db_file
        if best_match:
            info(f"  Similar found in DB: {os.path.basename(best_match)} (score {best_score:.3f})")
            return best_match
        return None
    except Exception as e:
        info(f"warning: error searching DB: {e}")
        return None

# -------------------------
# INGEST EXISTING LRCs & SONGS CLEANUP
# -------------------------
def ingest_existing_lrcs_and_cleanup_songs():
    """
    Copy any .lrc files that live alongside songs/ into lyrics/ and normalized DB (.lyrics_db).
    If an LRC exists in songs/, treat it as authoritative: MOVE it into lyrics/ (replacing
    any existing file with the same name in lyrics/) and COPY it into .lyrics_db/ under the
    canonical "<Title> - <Artist>.lrc" name, OVERWRITING any existing DB entry.
    Then remove any remaining non-audio files from the songs/ folder (keep .flac .mp3 .wav).
    """
    if not os.path.exists(SONGS_FOLDER):
        dbg("songs folder not present; skipping ingest")
        return

    info("Ingesting .lrc files found under songs/ and cleaning songs/...")
    lrc_patterns = [os.path.join(SONGS_FOLDER, "*.lrc"), os.path.join(SONGS_FOLDER, "*.LRC")]
    lrc_files = []
    for p in lrc_patterns:
        lrc_files.extend(glob.glob(p))

    for lrc_path in lrc_files:
        try:
            basename = os.path.basename(lrc_path)
            dest_lyrics = os.path.join(LYRICS_FOLDER, basename)

            # If a file with same name already exists in lyrics/, remove it to ensure authoritative replace.
            try:
                if os.path.exists(dest_lyrics):
                    os.remove(dest_lyrics)
                    dbg(f"existing lyrics file removed to be replaced: {dest_lyrics}")
            except Exception as e:
                dbg(f"could not remove existing lyrics file {dest_lyrics}: {e}")

            # MOVE the LRC from songs/ to lyrics/ (user requested that LRCs in songs are fully correct).
            try:
                shutil.move(lrc_path, dest_lyrics)
                dbg(f"moved {basename} to {LYRICS_FOLDER}")
            except Exception as e:
                # If move fails (permissions, cross-device), fallback to copy+remove
                try:
                    shutil.copy2(lrc_path, dest_lyrics)
                    os.remove(lrc_path)
                    dbg(f"copied then removed original (move fallback) {basename} to {LYRICS_FOLDER}")
                except Exception as e2:
                    info(f"warning: could not move or copy {basename} to {LYRICS_FOLDER}: {e2}")
                    continue  # skip further processing for this file

            # choose canonical DB name:
            name_no_ext = os.path.splitext(basename)[0]
            corresponding_flac = os.path.join(SONGS_FOLDER, f"{name_no_ext}.flac")
            artist_meta = None
            title_meta = None
            if os.path.exists(corresponding_flac):
                artist_meta, title_meta = extract_metadata_from_flac(corresponding_flac)

            # fallback: try to read [ti:] [ar:] tags in the LRC (now in dest_lyrics)
            if not (artist_meta and title_meta):
                lrc_artist, lrc_title = parse_lrc_header_tags(dest_lyrics)
                if lrc_artist and lrc_title:
                    if not artist_meta:
                        artist_meta = lrc_artist
                    if not title_meta:
                        title_meta = lrc_title

            if artist_meta and title_meta:
                canonical_db_name = sanitize_filename(f"{title_meta} - {artist_meta}")
            else:
                canonical_db_name = sanitize_filename(name_no_ext)

            db_dest = os.path.join(LYRICS_DB_FOLDER, f"{canonical_db_name}.lrc")

            # COPY into DB, overwriting any existing entry (authoritative move).
            try:
                if os.path.exists(db_dest):
                    os.remove(db_dest)
                    dbg(f"existing DB entry removed to be replaced: {db_dest}")
                shutil.copy2(dest_lyrics, db_dest)
                dbg(f"copied to DB {os.path.basename(db_dest)} (overwrote if existed)")
            except Exception as e:
                info(f"warning: error copying {dest_lyrics} to DB: {e}")
        except Exception as e:
            info(f"warning: error processing {lrc_path}: {e}")

    # clean non-audio files in songs/
    info("  Cleaning songs/: deleting non-audio files...")
    allowed_exts = {'.flac', '.mp3', '.wav'}
    for entry in os.listdir(SONGS_FOLDER):
        fpath = os.path.join(SONGS_FOLDER, entry)
        if os.path.isfile(fpath):
            ext = os.path.splitext(entry)[1].lower()
            if ext not in allowed_exts:
                try:
                    os.remove(fpath)
                    dbg(f"removed file {entry} from songs/")
                except Exception as e:
                    info(f"warning: could not remove {entry}: {e}")
    dbg("ingest and cleanup finished")

Plain lyrics retrieving

In [None]:
# -------------------------
# GENIUS HELPERS
# -------------------------
def get_genius_client(token: str) -> Optional[Genius]:
    """Return a configured lyricsgenius Genius client or None on failure."""
    try:
        g = Genius(token, timeout=30, retries=3, remove_section_headers=True)
        g.verbose = False
        g.skip_non_songs = True
        g.excluded_terms = ["(Remix)", "(Live)", "(Acoustic)", "(Instrumental)"]
        dbg("Genius client initialized")
        return g
    except Exception as e:
        info(f"warning: error initializing Genius: {e}")
        return None

def _normalize_for_genius_match(s: str) -> str:
    """
    Normalization oriented to title/artist matching:
      - removes parentheses and common contents (feat., remix, version...)
      - removes punctuation and collapses multiple spaces
      - lowercases the result
    """
    if not s:
        return ""
    t = re.sub(r'\([^)]*\)', ' ', s)              # remove parentheses
    t = re.sub(r'\[[^\]]*\]', ' ', t)             # remove brackets
    t = re.sub(r'(?i)\b(feat|ft|featuring|remix|version|versión|live)\b', ' ', t)
    t = re.sub(r"[^\w\s']", " ", t)
    t = re.sub(r'\s+', ' ', t).strip().lower()
    return t

def _artist_matches(artist: str, cand_artist: str, sim_threshold: float = 0.78, token_frac: float = 0.6) -> bool:
    """
    Decide if `artist` "matches" `cand_artist`.
    Criteria (accept if any is true):
      - SequenceMatcher similarity >= sim_threshold
      - or fraction of tokens of 'artist' present in cand_artist >= token_frac
      - or normalized artist is substring of cand_artist normalized (handles simple aliases)
    This allows accepting cases where the artist appears as 'Artist feat. X' or with small variants.
    """
    if not artist or not cand_artist:
        return False
    a = _normalize_for_genius_match(artist)
    c = _normalize_for_genius_match(cand_artist)
    if not a or not c:
        return False

    # general similarity
    sim = compute_similarity(a, c)
    if sim >= sim_threshold:
        return True

    # token intersection: how many tokens of the original 'artist' appear in cand_artist
    a_tokens = [t for t in re.split(r'\s+', a) if t]
    c_tokens = set([t for t in re.split(r'\s+', c) if t])
    if a_tokens:
        match_count = sum(1 for t in a_tokens if t in c_tokens)
        frac = match_count / len(a_tokens)
        if frac >= token_frac:
            return True

    # substring (after normalization) — useful for "the beatles" vs "beatles"
    if a in c or c in a:
        return True

    return False

def _title_artist_similarity(title: str, artist: str, cand_title: str, cand_artist: str) -> Tuple[float, float, float]:
    """
    Compute (title_sim, artist_sim, combined_score) between (title, artist) and candidate.
    Uses compute_similarity (SequenceMatcher over normalized text) and jaccard_tokens as support.
    """
    t1 = _normalize_for_genius_match(title or "")
    t2 = _normalize_for_genius_match(cand_title or "")
    a1 = _normalize_for_genius_match(artist or "")
    a2 = _normalize_for_genius_match(cand_artist or "")

    title_sim = compute_similarity(t1, t2)
    artist_sim = compute_similarity(a1, a2)

    # slight mix with jaccard to reinforce token-based matches
    j_t = jaccard_tokens(t1, t2)
    j_a = jaccard_tokens(a1, a2)

    # combined: more weight to title but we don't ignore artist
    combined = 0.65 * title_sim + 0.35 * artist_sim
    combined = 0.92 * combined + 0.08 * ((j_t + j_a) / 2.0)
    return title_sim, artist_sim, combined

def is_translation_content(text: str) -> bool:
    """Heuristic: return True if text likely refers to a translation or alternate-language lyrics."""
    text_lower = (text or "").lower()
    keywords = ['traducción', 'translation', 'traduccion', 'versión', 'versao', 'traduit', 'tradução', 'spanish', 'español']
    for kw in keywords:
        if kw in text_lower:
            return True
    patterns = [r'\b(sub|lyrics?)\s+(es|español|spanish|pt|português|portuguese)\b', r'\[(es|en|pt|fr|de|it)\]']
    for p in patterns:
        if re.search(p, text_lower):
            return True
    return False

def search_genius_candidates(artist: str, title: str, per_page: int = 30) -> List[dict]:
    """
    Calls genius.search_songs and returns a list of candidates with metrics:
      {title, artist, song_id, title_sim, artist_sim, combined, artist_ok}
    Does not filter by content; only scores.
    """
    if not artist or not title:
        return []

    candidates = []
    try:
        search_query = f"{title} {artist}"
        results = genius.search_songs(search_query, per_page=per_page)
        if not results or not results.get('hits'):
            return []
        for hit in results.get('hits', []):
            song_info = hit.get('result', {}) if isinstance(hit, dict) else {}
            cand_title = song_info.get('title', '')
            cand_artist = song_info.get('primary_artist', {}).get('name', '')
            song_id = song_info.get('id')
            if not song_id:
                continue
            if is_translation_content(f"{cand_title} {cand_artist}"):
                continue
            t_sim, a_sim, combined = _title_artist_similarity(title, artist, cand_title, cand_artist)
            artist_ok = _artist_matches(artist, cand_artist, sim_threshold=0.76, token_frac=0.6)
            candidates.append({
                'title': cand_title,
                'artist': cand_artist,
                'song_id': song_id,
                'title_sim': t_sim,
                'artist_sim': a_sim,
                'combined': combined,
                'artist_ok': artist_ok
            })
    except Exception as e:
        dbg(f"search_genius_candidates error: {e}")
    # sort by combined desc
    candidates.sort(key=lambda x: x['combined'], reverse=True)
    return candidates

# -------------------------
# get_genius_lyrics_simple (uses already-searched candidates)
# -------------------------
def get_genius_lyrics_simple_from_candidates(artist: str, title: str, candidates: List[dict], max_try: int = 8) -> str:
    """
    Attempts to download lyrics from the previously scored candidate list.
    Rules:
      - Reject candidates whose artist does not pass _artist_matches (artist_ok False)
      - Try to download by song_id (more reliable) and validate returned_artist with _artist_matches
      - If no valid candidates exist, raise RuntimeError
    """
    if not candidates:
        raise RuntimeError("No search candidates")

    tried = 0
    for cand in candidates:
        if tried >= max_try:
            break

        # Require matching artist to attempt download (strict policy)
        if not cand.get('artist_ok', False):
            dbg(f"  skipping candidate (artist mismatch pre-download): {cand['title']} - {cand['artist']} (a_sim={cand['artist_sim']:.2f})")
            continue

        try:
            # first try by id
            song_obj = None
            try:
                song_data = genius.song(cand['song_id'])
                if isinstance(song_data, dict) and 'song' in song_data:
                    song_info = song_data['song']
                    lyrics_text = song_info.get('lyrics') or song_info.get('lyrics_body') or ""
                    if lyrics_text:
                        song_obj = type("S", (), {"title": song_info.get('title', cand['title']),
                                                  "primary_artist": type("A", (), {"name": song_info.get('primary_artist', {}).get('name', cand['artist'])}),
                                                  "lyrics": lyrics_text})
            except Exception:
                song_obj = None

            if song_obj is None:
                # fallback locally to search_song (already filtered by artist_ok)
                song_obj = genius.search_song(cand['title'], cand['artist'])

            if not song_obj:
                tried += 1
                continue

            lyrics_text = getattr(song_obj, "lyrics", "") or ""
            if not lyrics_text or len(lyrics_text.strip()) < 30:
                tried += 1
                continue
            if 'instrumental' in lyrics_text.lower():
                tried += 1
                continue

            returned_title = getattr(song_obj, "title", cand['title'])
            returned_artist = getattr(getattr(song_obj, "primary_artist", None), "name", cand['artist'])

            # Final ARTIST validation (mandatory)
            if not _artist_matches(artist, returned_artist, sim_threshold=0.78, token_frac=0.7):
                dbg(f"  rejected after download: artist mismatch -> returned '{returned_artist}'")
                tried += 1
                continue

            # Final title validation: moderate (if artist matches strongly, allow small title variations)
            rt_sim, ra_sim, r_comb = _title_artist_similarity(title, artist, returned_title, returned_artist)
            if rt_sim < 0.60 and r_comb < 0.80:
                dbg(f"  rejected after download: title mismatch (rt_sim={rt_sim:.2f}, comb={r_comb:.2f})")
                tried += 1
                continue

            info(f"    Valid lyrics obtained (len={len(lyrics_text)}) from '{returned_title}' - '{returned_artist}'")
            return lyrics_text

        except Exception as e:
            dbg(f"  candidate fetch error: {e}")
            tried += 1
            continue

    raise RuntimeError("No valid lyrics after candidate filtering")

def get_genius_lyrics_fallback(artist: str, title: str) -> str:
    """
    Strict fallback: uses genius.search_song(title, artist) but accepts only if
    returned_artist matches strongly (sim >= 0.90) or normalized equality.
    """
    if not artist or not title:
        raise RuntimeError("Artist/title missing")

    try:
        dbg(f"    fallback strict search for '{title}' - '{artist}'")
        song = genius.search_song(title, artist)
        if not song:
            raise RuntimeError("Song not found")

        lyrics_text = getattr(song, "lyrics", None)
        if not lyrics_text and hasattr(song, 'to_dict'):
            sd = song.to_dict()
            lyrics_text = sd.get('lyrics') or sd.get('lyrics_body') or ""

        if not lyrics_text or len(lyrics_text.strip()) < 30:
            raise RuntimeError("Empty or too short lyrics")
        if 'instrumental' in lyrics_text.lower():
            raise RuntimeError("Instrumental")

        returned_title = getattr(song, "title", title)
        returned_artist = getattr(getattr(song, "primary_artist", None), "name", artist)

        # Require VERY strong artist match to accept fallback
        #  - normalized equality OR similarity >= 0.90
        a_norm_req = _normalize_for_genius_match(artist)
        ra_norm = _normalize_for_genius_match(returned_artist)
        if a_norm_req == ra_norm:
            artist_ok = True
        else:
            artist_ok = compute_similarity(a_norm_req, ra_norm) >= 0.90

        if not artist_ok:
            raise RuntimeError(f"Returned song artist '{returned_artist}' does not match requested artist '{artist}' (fallback rejected)")

        # And title must be reasonably similar
        t_sim, a_sim, comb = _title_artist_similarity(title, artist, returned_title, returned_artist)
        if t_sim < 0.70 and comb < 0.85:
            raise RuntimeError(f"Returned song title not similar enough (t_sim={t_sim:.2f}, comb={comb:.2f})")

        info(f"    Fallback obtained (len={len(lyrics_text)}), matched (t={t_sim:.2f}, a={a_sim:.2f})")
        return lyrics_text
    except Exception as e:
        raise RuntimeError(f"Fallback error: {e}")

def get_genius_lyrics(artist: str, title: str) -> str:
    """
    search candidates and mark whether at least one has artist_ok
    if NONE has artist_ok -> fail immediately (no fallback)
    if at least one exists, try to download using get_genius_lyrics_simple_from_candidates
    if that fails, allow strict fallback (get_genius_lyrics_fallback)
    """
    errors = []
    try:
        candidates = search_genius_candidates(artist, title, per_page=30)
        if not candidates:
            raise RuntimeError("No search hits at all")

        # any candidates whose artist matches (according to _artist_matches) in the search?
        any_artist_ok = any(c.get('artist_ok', False) for c in candidates)
        dbg(f"  search produced {len(candidates)} candidates; any_artist_ok={any_artist_ok}")

        if not any_artist_ok:
            # policy: if there is not even a single candidate with matching artist, DO NOT attempt fallback.
            raise RuntimeError("No candidate with matching artist found")

        # attempt download/validation based on filtered candidates
        try:
            return get_genius_lyrics_simple_from_candidates(artist, title, candidates, max_try=8)
        except Exception as e:
            errors.append(f"SimpleCandidates: {e}")
            dbg(f"simple-from-candidates failed: {e}")

        # if we reach here, attempt strict fallback (allowed because any_artist_ok == True)
        try:
            return get_genius_lyrics_fallback(artist, title)
        except Exception as e:
            errors.append(f"Fallback: {e}")
            dbg(f"fallback failed: {e}")

        raise RuntimeError("All Genius methods failed: " + " | ".join(errors))

    except Exception as e:
        raise RuntimeError(f"Error: {e}")

Transcription

In [None]:
# -------------------------
# WHISPER TRANSCRIPTION & POSTPROCESS
# -------------------------
def transcribe_with_whisper(path: str, model_size: str):
    """
    Transcribe audio at `path` using the Whisper model named `model_size`.
    Returns dict with keys:
      - 'words': list of (token, start, end)
      - 'segments': list of segment metadata {'text','start','end',...}
      - 'duration': float (total duration used)
    """
    dbg(f"loading model '{model_size}'")
    try:
        model = whisper.load_model(model_size)
    except Exception as e:
        raise RuntimeError(f"Error loading model '{model_size}': {e}")

    dbg(f"transcribing with model '{model_size}'")
    try:
        # prefer word_timestamps if supported by the API; fallback gracefully
        result = model.transcribe(path, word_timestamps=True, language=None, temperature=0.0)
    except TypeError:
        result = model.transcribe(path, language=None, temperature=0.0)

    words_out: List[Tuple[str, float, float]] = []
    segments_meta: List[Dict] = []
    for seg in result.get("segments", []):
        seg_start = float(seg.get("start", 0.0))
        seg_end = float(seg.get("end", seg_start + 0.01))
        seg_text = seg.get("text", "").strip()
        seg_meta = {
            "text": seg_text,
            "start": seg_start,
            "end": seg_end,
            "avg_logprob": seg.get("avg_logprob"),
            "no_speech_prob": seg.get("no_speech_prob"),
        }
        segments_meta.append(seg_meta)
        if 'words' in seg and isinstance(seg['words'], list) and len(seg['words']) > 0:
            for w in seg['words']:
                token = w.get('word', '').strip()
                if not token:
                    continue
                s = float(w.get('start', seg_start))
                e = float(w.get('end', s + 0.05))
                words_out.append((normalize_text(token), s, e))
        else:
            # fallback: slice segment evenly across words
            toks = [t for t in re.split(r"\s+", normalize_text(seg_text)) if t]
            if not toks:
                continue
            seg_len = seg_end - seg_start
            per = seg_len / max(1, len(toks))
            for i, tok in enumerate(toks):
                s = seg_start + i * per
                e = min(seg_end, s + per)
                words_out.append((tok, s, e))

    duration = result.get('duration', words_out[-1][2] if words_out else 0.0)
    dbg(f"raw transcription: {len(words_out)} tokens, duration ~{duration:.2f}s, {len(segments_meta)} segments")

    cleaned_words = clean_transcribed_words(words_out)
    dbg(f"cleaned transcription: {len(cleaned_words)} tokens")
    return {"words": cleaned_words, "segments": segments_meta, "duration": duration}

def clean_transcribed_words(words: List[Tuple[str, float, float]]) -> List[Tuple[str, float, float]]:
    """
    Simplified cleaning of word timestamps:
      - Merge consecutive identical tokens if the gap is small (<= 0.25s)
      - Remove extremely short tokens (duration < 0.03s) and empty tokens
    """
    if not words:
        return []

    out = []
    prev_tok, prev_s, prev_e = words[0]
    for tok, s, e in words[1:]:
        if tok == prev_tok and (s - prev_e) <= 0.25:
            # extend previous token end
            prev_e = e
        else:
            if prev_tok and prev_tok.strip():
                out.append((prev_tok, prev_s, prev_e))
            prev_tok, prev_s, prev_e = tok, s, e
    if prev_tok and prev_tok.strip():
        out.append((prev_tok, prev_s, prev_e))

    # filter very-short durations (likely noise)
    final = []
    for tok, s, e in out:
        dur = e - s
        if dur < 0.03:
            continue
        final.append((tok, s, e))

    return final

# -------------------------
# TRANSCRIPTION LOGGING HELPERS
# -------------------------
def save_transcription_files(basename: str, words: List[Tuple[str,float,float]], model_name: str):
    """Save plain transcripts and per-token timestamps under LOGS_FOLDER for later inspection."""
    try:
        transcript_path = os.path.join(LOGS_FOLDER, f"{basename}.{model_name}.transcript.txt")
        with open(transcript_path, "w", encoding="utf-8") as ft:
            ft.write(" ".join(w[0] for w in words))
        ts_path = os.path.join(LOGS_FOLDER, f"{basename}.{model_name}.whisper.ts.txt")
        with open(ts_path, "w", encoding="utf-8") as ft:
            for w,s,e in words:
                ft.write(f"{s:.3f}\t{e:.3f}\t{w}\n")
        dbg(f"saved transcript and timestamps for {basename} ({model_name}) in {LOGS_FOLDER}")
    except Exception as e:
        info(f"warning: error saving transcription files: {e}")

def save_model_logs_text(basename: str, model_name: str, segments_meta, candidate_anchors_all, accepted_candidates, anchors_dict):
    """
    Save a human-readable plain text log to LOGS_FOLDER describing:
      - segment text with times
      - all anchor candidates with their scores
      - accepted anchor indices and final anchor mapping
    """
    try:
        path = os.path.join(LOGS_FOLDER, f"{basename}.{model_name}.log")
        with open(path, "w", encoding="utf-8") as f:
            f.write(f"Model: {model_name}\n")
            f.write(f"Segments ({len(segments_meta)}):\n")
            for seg in segments_meta:
                f.write(f"  [{seg.get('start',0):.3f}-{seg.get('end',0):.3f}] {seg.get('text','').strip()}\n")
            f.write("\nAll candidates (idx, ts, score, overlap):\n")
            for c in candidate_anchors_all:
                f.write(f"  {c}\n")
            f.write("\nAccepted candidates after spacing (idx, ts, score, overlap):\n")
            for a in accepted_candidates:
                f.write(f"  {a}\n")
            f.write("\nFinal anchors (index -> time):\n")
            for k,v in sorted(anchors_dict.items()):
                f.write(f"  {k} -> {v:.3f}\n")
        dbg(f"saved plain-text logs for {basename} ({model_name}) at {path}")
    except Exception as e:
        dbg(f"error saving logs for {basename} ({model_name}): {e}")

# -------------------------
# SILENCE HELPERS & INTERVAL MATH
# -------------------------
def detect_silences(words: List[Tuple[str,float,float]], min_silence: float = MIN_SILENCE_DURATION) -> List[Tuple[float,float]]:
    """Return list of (end, next_start) gaps >= min_silence found between consecutive words."""
    silences = []
    if not words:
        return silences
    for i in range(len(words)-1):
        end = words[i][2]
        nxt = words[i+1][1]
        gap = nxt - end
        if gap >= min_silence:
            silences.append((end, nxt))
    return silences

def long_silences_from_words(words: List[Tuple[str,float,float]], threshold: float = LONG_SILENCE_THRESHOLD) -> List[Tuple[float,float]]:
    """Return silences considered 'long' (>= threshold)."""
    return [sil for sil in detect_silences(words, min_silence=0.0) if (sil[1] - sil[0]) >= threshold]

def subtract_long_silences_from_interval(a: float, b: float, long_silences: List[Tuple[float,float]]) -> List[Tuple[float,float]]:
    """
    Subtract long silent intervals from [a,b] and return remaining active intervals.
    Useful for distributing interpolated timestamps only across 'active' audio.
    """
    if a >= b:
        return []
    intervals = [(a,b)]
    for s,e in sorted(long_silences):
        new_intervals = []
        for (x,y) in intervals:
            if e <= x or s >= y:
                new_intervals.append((x,y))
            else:
                if s > x:
                    new_intervals.append((x, min(s,y)))
                if e < y:
                    new_intervals.append((max(e,x), y))
        intervals = new_intervals
        if not intervals:
            break
    intervals = [(max(a,x), min(b,y)) for (x,y) in intervals if max(a,x) < min(b,y)]
    intervals.sort()
    return intervals

def distribute_across_active_intervals(n_items: int, active_intervals: List[Tuple[float,float]]) -> List[float]:
    """
    Equally distribute n_items across a list of active intervals proportionally to each interval's length.
    Returns a list of timestamps (one per item) spaced throughout active audio.
    """
    if n_items <= 0:
        return []
    durations = [y - x for (x,y) in active_intervals]
    total = sum(durations)
    if total <= 1e-6:
        return [0.0] * n_items
    timestamps = []
    for j in range(1, n_items + 1):
        frac = j / (n_items + 1)
        offset = frac * total
        acc = 0.0
        for (iv_start, iv_end), iv_len in zip(active_intervals, durations):
            if acc + iv_len >= offset - 1e-9:
                within = offset - acc
                ts = iv_start + within
                timestamps.append(ts)
                break
            acc += iv_len
    if len(timestamps) < n_items and active_intervals:
        iv_start, iv_end = active_intervals[0]
        for _ in range(n_items - len(timestamps)):
            timestamps.append(iv_start + 0.1 * (_+1))
    return timestamps

# -------------------------
# MATCHING HELPERS
# -------------------------
def compute_similarity(a: str, b: str) -> float:
    """Return a normalized similarity (0..1) between two strings using SequenceMatcher."""
    a_tok = normalize_text(a)
    b_tok = normalize_text(b)
    if not a_tok or not b_tok:
        return 0.0
    return difflib.SequenceMatcher(None, a_tok, b_tok).ratio()

def compute_overlap_fraction(line_words: List[str], segment_words: List[str]) -> float:
    """Return fraction of words in `line_words` that appear in `segment_words` (exact normalized match)."""
    if not line_words:
        return 0.0
    line_set = [normalize_text(w) for w in line_words]
    seg_set = [normalize_text(w) for w in segment_words]
    matches = 0
    for w in line_set:
        if w and w in seg_set:
            matches += 1
    return matches / len(line_set)

def find_best_match_for_line_strict(line: str, words_timing: List[Tuple[str,float,float]], segments_text: Optional[List[Tuple[str,float,float]]] = None) -> Tuple[Optional[int], float, float, int]:
    """
    For a lyric line, attempt to find the best matching window of tokens in words_timing.
    Returns (start_index_or_None, best_score, best_timestamp, matched_length).
    If start_index is None, best_timestamp is derived from segment-level match.
    """
    if not line.strip() or not words_timing:
        return None, 0.0, 0.0, 0
    lyric_words = [w for w in re.split(r"\s+", normalize_text(line)) if w]
    if not lyric_words:
        return None, 0.0, 0.0, 0
    trans_words = [w[0] for w in words_timing]
    n = len(trans_words)
    base_len = len(lyric_words)
    best_pos = None
    best_score = 0.0
    best_ts = 0.0
    best_len = 0
    # try windows around expected lyric word length
    for L in range(max(1, base_len - 1), base_len + 2):
        for i in range(0, n - L + 1):
            seg = trans_words[i:i+L]
            seg_text = " ".join(seg)
            sim = compute_similarity(line, seg_text)
            overlap = compute_overlap_fraction(lyric_words, seg)
            score = sim * 0.85 + (0.15 * overlap)
            len_penalty = 1.0 - abs(len(seg) - base_len) / max(1, base_len)
            score *= (0.9 + 0.1 * len_penalty)
            if score > best_score:
                best_score = score
                best_pos = i
                best_ts = words_timing[i][1]
                best_len = L
    # fallback to segment-level matches if any segment text closely matches the line
    if segments_text:
        for seg_text, seg_start, seg_end in segments_text:
            seg_sim = compute_similarity(line, seg_text)
            if seg_sim > 0.55:
                seg_score = seg_sim + 0.10
                if seg_score > best_score:
                    best_score = seg_score
                    best_pos = None
                    best_ts = seg_start
                    best_len = max(1, len([w for w in re.split(r"\s+", seg_text) if w]))
    return best_pos, best_score, best_ts, best_len



Raw lyrics filtering and LRC building

In [None]:
# -------------------------
# DUPLICATE DETECTION (UNION-FIND)
# -------------------------
def normalize_for_dup_check(s: str) -> str:
    """Normalized text for duplicate detection (strip parentheses/brackets and punctuation)."""
    if s is None:
        return ""
    t = s
    t = re.sub(r'\([^)]*\)', ' ', t)
    t = re.sub(r'\[[^\]]*\]', ' ', t)
    t = re.sub(r"[^\w\s']", ' ', t)
    t = re.sub(r'\s+', ' ', t).strip().lower()
    return t

def jaccard_tokens(a: str, b: str) -> float:
    """Compute Jaccard between token sets of two normalized strings."""
    aset = set([w for w in re.split(r'\s+', normalize_for_dup_check(a)) if w])
    bset = set([w for w in re.split(r'\s+', normalize_for_dup_check(b)) if w])
    if not aset and not bset:
        return 0.0
    inter = aset.intersection(bset)
    union = aset.union(bset)
    return len(inter) / len(union) if union else 0.0

class UnionFind:
    """Small union-find helper used for clustering repeated lyric lines."""
    def __init__(self, n):
        self.parent = list(range(n))
    def find(self, x):
        p = self.parent
        while p[x] != x:
            p[x] = p[p[x]]
            x = p[x]
        return x
    def union(self, a, b):
        ra = self.find(a); rb = self.find(b)
        if ra == rb:
            return
        self.parent[rb] = ra

def cluster_similar_lines_robust(lines: List[str]) -> Tuple[List[int], Dict[int, List[int]]]:
    """
    Cluster similar lyric lines to avoid using repeated lines (chorus repeats) as anchors.
    Hardcoded thresholds (for publication simplicity):
      - Sequence similarity threshold: 0.92
      - Jaccard tokens threshold: 0.75
    Returns:
      - cluster_id_by_index: list mapping line index -> cluster id
      - clusters: dict cluster_id -> list of indices in that cluster
    """
    seq_thresh = 0.92
    jaccard_thresh = 0.75
    n = len(lines)
    if n == 0:
        return [], {}
    uf = UnionFind(n)
    normalized = [normalize_for_dup_check(l) for l in lines]
    for i in range(n):
        for j in range(i+1, n):
            a = normalized[i]; b = normalized[j]
            if not a or not b:
                continue
            seq_sim = difflib.SequenceMatcher(None, a, b).ratio()
            jac = jaccard_tokens(a, b)
            if seq_sim >= seq_thresh or jac >= jaccard_thresh:
                uf.union(i, j)
    root_to_cid = {}
    clusters = {}
    cluster_id_by_index = [-1] * n
    next_cid = 0
    for i in range(n):
        r = uf.find(i)
        if r not in root_to_cid:
            root_to_cid[r] = next_cid
            clusters[next_cid] = []
            next_cid += 1
        cid = root_to_cid[r]
        clusters[cid].append(i)
        cluster_id_by_index[i] = cid
    dbg(f"clustered {n} lines into {len(clusters)} clusters")
    return cluster_id_by_index, clusters

# -------------------------
# WRITE LRC HELPERS
# -------------------------
def write_lrc(basename: str, lines: List[str], times: List[float]):
    """Write a standard LRC file into LYRICS_FOLDER."""
    out_path = os.path.join(LYRICS_FOLDER, f"{basename}.lrc")
    with open(out_path, "w", encoding="utf-8") as f:
        for line, t in zip(lines, times):
            m = int(t // 60)
            s = t % 60
            f.write(f"[{m:02d}:{s:05.2f}]{line}\n")
    dbg(f"written LRC to {out_path}")
    return out_path

def enforce_monotonic(times: List[float], min_prog: float = MIN_LINE_PROGRESSION) -> List[float]:
    """Ensure times are strictly increasing; adjust with a minimal progression if needed."""
    out = []
    last = -1e9
    for t in times:
        if t <= last + 1e-9:
            t = last + min_prog
        out.append(t)
        last = t
    return out

# -------------------------
# ANCHORS COMPUTATION
# -------------------------
def compute_anchors_from_transcription(lines: List[str], words: List[Tuple[str,float,float]], segments_text: List[Tuple[str,float,float]], long_silences: List[Tuple[float,float]], cluster_ids: List[int],
                                       repeated_cluster_ids: set) -> Tuple[Dict[int,float], float, List[Tuple[int,float,float,float]], List[Tuple[int,float,float,float]]]:
    """
    Given lyric lines and transcription tokens, return:
      - anchors: dict index -> timestamp
      - score_sum: sum of accepted anchor scores
      - accepted_candidates: list accepted candidate tuples
      - all_candidates: list of all candidate tuples
    Notes:
      - We no longer require a minimum number of anchor words; attempts are made for all lines,
        but repeated-line clusters are excluded from candidates to avoid false duplicates.
    """
    candidate_anchors_all = []
    for idx, line in enumerate(lines):
        if cluster_ids and cluster_ids[idx] in repeated_cluster_ids:
            # skip this line since it belongs to a repeated cluster
            continue
        pos, score, ts, seg_len = find_best_match_for_line_strict(line, words, segments_text)
        if pos is not None or score > 0:
            lyric_words = [w for w in re.split(r"\s+", normalize_text(line)) if w]
            if pos is not None and seg_len > 0:
                seg_words = [w[0] for w in words[pos:pos+seg_len]]
            else:
                seg_words = [w[0] for w in words if abs(w[1] - ts) < 2.0][:len(lyric_words)]
            overlap = compute_overlap_fraction(lyric_words, seg_words)
            candidate_anchors_all.append((idx, ts, score, overlap))
    candidate_anchors_all.sort(key=lambda x: x[1])
    anchors: Dict[int, float] = {}
    last_anchor_time = -9999.0
    score_sum = 0.0
    accepted_candidates = []
    for idx, ts, score, overlap in candidate_anchors_all:
        if score >= THRESH_ANCHOR and overlap >= MIN_OVERLAP and ts > last_anchor_time + MIN_ANCHOR_SPACING:
            inside_long = any(s <= ts <= e for (s,e) in long_silences)
            if inside_long:
                dbg(f"candidate idx={idx} ts={ts:.2f} skipped because inside long silence")
                continue
            anchors[idx] = ts
            last_anchor_time = ts
            score_sum += score
            accepted_candidates.append((idx, ts, score, overlap))
    dbg(f"accepted anchors: {len(anchors)}, score_sum: {score_sum:.3f}")
    return anchors, score_sum, accepted_candidates, candidate_anchors_all

def compute_final_times_from_anchors(lines: List[str], anchors: Dict[int,float], words: List[Tuple[str,float,float]], long_silences: List[Tuple[float,float]], duration: float) -> List[float]:
    """
    Given anchors mapping, interpolate times for all lines:
      - For lines before first anchor: use active audio intervals or weighted equispacing with FIRST_LINE_WEIGHT
      - Between anchors: distribute across active intervals or fallback spacing
      - After last anchor: distribute in tail active intervals or use fallback spacing
    Returns list of timestamps (len == len(lines)).
    """
    final_times: List[Optional[float]] = [None] * len(lines)
    for idx, t in anchors.items():
        final_times[idx] = t

    anchor_indices = sorted(anchors.keys())
    if not anchor_indices:
        # no reliable anchors at all: uniform distribution across duration
        n = len(lines)
        return enforce_monotonic([i * (duration / max(1, n-1)) for i in range(n)])

    # HEAD region (before first anchor)
    first_idx = anchor_indices[0]
    first_t = anchors[first_idx]
    if first_idx > 0:
        active_head = subtract_long_silences_from_interval(0.0, first_t, long_silences)
        if active_head:
            times_head = distribute_across_active_intervals(first_idx, active_head)
            for i, tt in enumerate(times_head):
                final_times[i] = tt
        else:
            # weighted equispacing: first line receives FIRST_LINE_WEIGHT portion
            n = first_idx
            weights = [FIRST_LINE_WEIGHT] + [1] * (n - 1)
            total_weight = sum(weights)
            cum = 0.0
            for i, w in enumerate(weights):
                cum += w
                final_times[i] = (cum / total_weight) * first_t

    # BETWEEN anchors
    for a_i, b_i in zip(anchor_indices, anchor_indices[1:]):
        ta = anchors[a_i]; tb = anchors[b_i]
        segment = list(range(a_i+1, b_i))
        if not segment:
            continue
        active_intervals = subtract_long_silences_from_interval(ta, tb, long_silences)
        if active_intervals and sum((y-x) for (x,y) in active_intervals) > 1e-6:
            timestamps = distribute_across_active_intervals(len(segment), active_intervals)
            for li, ts in zip(segment, timestamps):
                final_times[li] = ts
        else:
            spacing = 1.5
            cand_times = []
            for j in range(1, len(segment) + 1):
                cand = ta + j * spacing
                shifted = cand
                for s,e in long_silences:
                    if s < cand < e:
                        shifted = e + 0.05
                        break
                if shifted >= tb:
                    shifted = ta + (tb - ta) * (j / (len(segment) + 1))
                cand_times.append(min(tb - 0.01, shifted))
            for li, ts in zip(segment, cand_times):
                final_times[li] = ts

    # TAIL region (after last anchor)
    last_idx = anchor_indices[-1]
    last_t = anchors[last_idx]
    if last_idx < len(lines) - 1:
        active_tail = subtract_long_silences_from_interval(last_t, duration, long_silences)
        n_tail = len(lines) - 1 - last_idx
        if active_tail and sum((y-x) for (x,y) in active_tail) > 1e-6:
            times_tail = distribute_across_active_intervals(n_tail, active_tail)
            for k, tt in enumerate(times_tail, start=1):
                final_times[last_idx + k] = tt
        else:
            for k in range(1, len(lines) - last_idx):
                final_times[last_idx + k] = min(duration, final_times[last_idx + k - 1] + FALLBACK_SPACING)

    # Any remaining None -> fallback uniform distribution
    for i in range(len(final_times)):
        if final_times[i] is None:
            final_times[i] = min(duration, i * (duration / max(1, len(lines) - 1)))

    final_times = enforce_monotonic(final_times)
    return final_times

Model comparison

In [None]:
# -------------------------
# PER-FILE HIGH-LEVEL FLOW
# -------------------------
def process_file_with_model_comparison(flac_path: str, genius_client: Optional[Genius], artist_meta: Optional[str]=None, title_meta: Optional[str]=None, db_already_checked: bool=False) -> bool:
    """
    Main per-file processing (updated to append results into anchors log).
    Returns True if at least one anchor was produced by the chosen model; False otherwise.
    Also appends an entry in ANCHORS_LOG_PATH with the chosen model or failure reason.
    """
    basename = os.path.splitext(os.path.basename(flac_path))[0]
    txt_path = os.path.join(SONGS_FOLDER, f"{basename}.txt")
    lyrics = None

    # Only extract metadata if not provided by caller
    if artist_meta is None or title_meta is None:
        try:
            artist_meta, title_meta = extract_metadata_from_flac(flac_path)
        except Exception:
            artist_meta, title_meta = None, None
    else:
        dbg(f"metadata provided by caller: artist='{artist_meta}', title='{title_meta}'")

    lrc_target_path = os.path.join(LYRICS_FOLDER, f"{basename}.lrc")
    # Early restore: only attempt if caller did NOT already check the DB.
    if (not db_already_checked) and title_meta and artist_meta:
        try:
            if restore_from_lyrics_db(title_meta, artist_meta, lrc_target_path):
                info(f"Restored exact LRC from DB: '{title_meta}' - '{artist_meta}' (skipping transcription)")
                try:
                    append_anchor_entry(basename, "restored_from_db", f"{title_meta} - {artist_meta}")
                except Exception:
                    dbg("could not append restore entry to anchors log")
                return True
        except Exception as e:
            dbg(f"error during early restore: {e}")

    # If a local .txt with lyrics exists, use it
    if os.path.exists(txt_path):
        try:
            with open(txt_path, "r", encoding="utf-8") as f:
                lyrics = f.read()
            info(f"Using local lyrics file {txt_path}")
        except Exception as e:
            info(f"warning: could not read {txt_path}: {e}")

    # fallback: try to find a similar entry in the DB
    if not lyrics and title_meta and artist_meta:
        similar = search_similar_in_db(title_meta, artist_meta, similarity_threshold=0.82)
        if similar:
            try:
                shutil.copy2(similar, lrc_target_path)
                info(f"Restored similar LRC from DB: {os.path.basename(similar)}")
                copy_to_lyrics_db(lrc_target_path, title_meta, artist_meta)
                try:
                    append_anchor_entry(basename, "restored_similar_from_db", os.path.basename(similar))
                except Exception:
                    dbg("could not append similar restore entry to anchors log")
                return True
            except Exception as e:
                info(f"warning: error copying similar DB file: {e}")

    # If still not found, attempt Genius (if available)
    if not lyrics:
        if not title_meta:
            info(f"warning: missing title metadata for {basename}; cannot fetch lyrics")
            try:
                append_anchor_entry(basename, "lyrics retrieving failed", "missing title metadata")
            except Exception:
                dbg("could not append lyrics-failed entry")
            return False
        if not genius_client:
            info(f"warning: no Genius client and no local '{basename}.txt' found; skipping")
            try:
                append_anchor_entry(basename, "lyrics retrieving failed", "no genius client")
            except Exception:
                dbg("could not append lyrics-failed entry")
            return False
        try:
            lyrics = get_genius_lyrics(artist_meta, title_meta)
            if not lyrics:
                info("warning: Genius returned empty lyrics")
                try:
                    append_anchor_entry(basename, "lyrics retrieving failed", "empty lyrics")
                except Exception:
                    dbg("could not append lyrics-failed entry")
                return False
            # Save raw lyrics to logs ONLY
            try:
                raw_path = os.path.join(LOGS_FOLDER, f"{basename}.raw_lyrics.txt")
                with open(raw_path, "w", encoding="utf-8") as rf:
                    rf.write(lyrics)
                dbg(f"saved raw lyrics into {raw_path}")
            except Exception as e:
                dbg(f"could not save raw lyrics: {e}")
        except Exception as e:
            info(f"error obtaining lyrics from Genius: {e}")
            try:
                append_anchor_entry(basename, "lyrics retrieving failed", str(e))
            except Exception:
                dbg("could not append lyrics-failed entry")
            return False

    # Split into cleaned lyric lines
    lines = split_lyrics_lines(lyrics)
    if not lines:
        info(f"warning: lyrics empty after splitting for {basename}")
        try:
            append_anchor_entry(basename, "lyrics retrieving failed", "no lines after split")
        except Exception:
            dbg("could not append lyrics-failed entry")
        return False

    # Cluster similar lines
    cluster_ids, clusters = cluster_similar_lines_robust(lines)
    repeated_cluster_ids = {cid for cid, members in clusters.items() if len(members) > 1}
    dbg(f"{len(clusters)} clusters detected, {len(repeated_cluster_ids)} are repeated clusters")

    # Transcribe with all configured models, save logs and compute anchors
    model_results = []
    for model_name in TRANSCRIBE_MODELS:
        try:
            dbg(f"transcribing {basename} using model '{model_name}'")
            transcribed = transcribe_with_whisper(flac_path, model_name)
            words = transcribed["words"]
            segments_meta = transcribed.get("segments", [])
            duration = transcribed.get("duration", words[-1][2] if words else 180.0)
            dbg(f"model '{model_name}' produced {len(words)} tokens, duration ~{duration:.2f}s")
            if not words:
                dbg(f"model '{model_name}' returned no words")
                model_results.append((model_name, None))
                continue

            save_transcription_files(basename, words, model_name)

            # Build segments_text using a fixed gap threshold (1.0s) for simplicity here
            segments_text = []
            cur_words = [words[0][0]]
            cur_start = words[0][1]
            cur_end = words[0][2]
            for tok, s, e in words[1:]:
                gap = s - cur_end
                if gap > 1.0:
                    segments_text.append((" ".join(cur_words), cur_start, cur_end))
                    cur_words = [tok]
                    cur_start = s
                    cur_end = e
                else:
                    cur_words.append(tok)
                    cur_end = e
            segments_text.append((" ".join(cur_words), cur_start, cur_end))

            long_silences = long_silences_from_words(words, threshold=LONG_SILENCE_THRESHOLD)

            anchors, score_sum, accepted_candidates, all_candidates = compute_anchors_from_transcription(
                lines, words, segments_text, long_silences, cluster_ids, repeated_cluster_ids
            )
            anchors_count = len(anchors)
            dbg(f"model '{model_name}' => anchors_count={anchors_count}, score_sum={score_sum:.3f}")

            save_model_logs_text(basename, model_name, segments_meta, all_candidates, accepted_candidates, anchors)

            final_times = compute_final_times_from_anchors(lines, anchors, words, long_silences, duration)
            model_results.append((model_name, {
                "anchors": anchors,
                "anchors_count": anchors_count,
                "score_sum": score_sum,
                "final_times": final_times,
                "words": words,
                "segments_text": segments_text,
                "duration": duration,
            }))
        except Exception as e:
            dbg(f"error with model '{model_name}': {e}")
            model_results.append((model_name, None))

    # Choose the best model by anchors_count first, then by score_sum
    best_candidate = None
    best_metrics = (-1, -1.0)
    for model_name, data in model_results:
        if not data:
            continue
        ac = data["anchors_count"]
        ss = data["score_sum"]
        if ac > best_metrics[0] or (ac == best_metrics[0] and ss > best_metrics[1]):
            best_metrics = (ac, ss)
            best_candidate = (model_name, data)

    if not best_candidate:
        info(f"No model produced valid anchors for '{basename}'. Writing fallback equispaced LRC.")
        # choose fallback duration from any model if available
        fallback_duration = 180.0
        for _, data in model_results:
            if data and data.get("duration"):
                fallback_duration = data["duration"]
                break
        times = [i * (fallback_duration / max(1, len(lines) - 1)) for i in range(len(lines))]
        times = enforce_monotonic(times)
        lrc_path = write_lrc(basename, lines, times)
        if title_meta and artist_meta:
            copy_to_lyrics_db(lrc_path, title_meta, artist_meta)
        info(f"Fallback LRC written: {lrc_path}")
        try:
            append_anchor_entry(basename, "all models failed", f"duration={fallback_duration:.1f}")
        except Exception:
            dbg("could not append all-models-failed entry")
        return False

    chosen_model, chosen_data = best_candidate
    info(f"Chosen model for '{basename}': {chosen_model} (anchors={chosen_data['anchors_count']}, score_sum={chosen_data['score_sum']:.3f})")

    final_times = chosen_data["final_times"]
    lrc_path = write_lrc(basename, lines, final_times)
    if title_meta and artist_meta:
        copy_to_lyrics_db(lrc_path, title_meta, artist_meta)
    info(f"LRC generated: {lrc_path}")

    # Log success with chosen model and some metrics
    try:
        extra = f"anchors={chosen_data['anchors_count']},score={chosen_data['score_sum']:.3f}"
        append_anchor_entry(basename, f"chosen:{chosen_model}", extra)
    except Exception:
        dbg("could not append chosen-model entry")

    return chosen_data["anchors_count"] > 0

Main loop

In [None]:
# -------------------------
# MAIN
# -------------------------
if __name__ == "__main__":
    info("Anchors-Strict LRC Generator starting")
    initialize_folders()
    # Initialize anchors run log header (must run after LOGS_FOLDER exists)
    try:
        init_anchors_log()
    except Exception as e:
        dbg(f"init_anchors_log failed: {e}")
    ingest_existing_lrcs_and_cleanup_songs()

    creds = load_credentials(CREDENTIALS_PATH)
    GENIUS_ACCESS_TOKEN = creds.get("genius_access_token")
    genius = None
    if GENIUS_ACCESS_TOKEN:
        genius = get_genius_client(GENIUS_ACCESS_TOKEN)
        if genius:
            info("Genius client configured")
            try:
                # quick connectivity sanity check
                _ = genius.search_songs("test", per_page=1)
                dbg("genius connectivity OK")
            except Exception as e:
                dbg(f"genius test issue: {e}")
        else:
            info("warning: Genius token provided but initialization failed")
    else:
        info("note: no Genius token in credentials.json; Genius lookups will be skipped")

    flac_files = find_flac_files(SONGS_FOLDER)
    if not flac_files:
        info(f"No FLAC files found in '{SONGS_FOLDER}'")
    else:
        info(f"Found {len(flac_files)} files in {SONGS_FOLDER}")
        processed = 0
        failed = 0
        no_anchor_files = []
        for i, flac in enumerate(flac_files, 1):
            info(f"[{i}/{len(flac_files)}] Processing: {os.path.basename(flac)}")

            # Early check: if DB already contains a matching Title-Artist entry restore and SKIP
            try:
                artist_meta, title_meta = extract_metadata_from_flac(flac)
            except Exception:
                artist_meta, title_meta = None, None

            if title_meta and artist_meta:
                lrc_target_path = os.path.join(LYRICS_FOLDER, f"{os.path.splitext(os.path.basename(flac))[0]}.lrc")
                try:
                    if restore_from_lyrics_db(title_meta, artist_meta, lrc_target_path):
                        info(f"Restored from DB (skipping transcription): '{title_meta}' - '{artist_meta}'")
                        # Log the restore event in persistent anchors log
                        try:
                            append_anchor_entry(os.path.splitext(os.path.basename(flac))[0], "restored_from_db_mainloop", f"{title_meta} - {artist_meta}")
                        except Exception as e:
                            dbg(f"could not append mainloop restore entry: {e}")
                        processed += 1
                        continue
                except Exception as e:
                    dbg(f"error during early DB restore: {e}")

            try:
                ok_has_anchors = process_file_with_model_comparison(flac, genius, artist_meta=artist_meta, title_meta=title_meta, db_already_checked=True)
                if ok_has_anchors:
                    processed += 1
                else:
                    no_anchor_files.append(os.path.basename(flac))
                    failed += 1
            except Exception as e:
                info(f"error processing {os.path.basename(flac)}: {e}")
                failed += 1

        info("\nSummary:")
        info(f"  processed with anchors: {processed}")
        info(f"  without anchors / problematic: {failed}")

        db_files = glob.glob(os.path.join(LYRICS_DB_FOLDER, "*.lrc"))
        dbg(f"DB files count: {len(db_files)}")

[DEBUG] raw transcription: 432 tokens, duration ~213.40s, 183 segments
[DEBUG] cleaned transcription: 309 tokens
[DEBUG] model 'large-v3' produced 309 tokens, duration ~213.40s
[DEBUG] saved transcript and timestamps for 071 - Echoes - Afinity Remix (large-v3) in .logs
[DEBUG] accepted anchors: 5, score_sum: 4.942
[DEBUG] model 'large-v3' => anchors_count=5, score_sum=4.942
[DEBUG] saved plain-text logs for 071 - Echoes - Afinity Remix (large-v3) at .logs\071 - Echoes - Afinity Remix.large-v3.log
[DEBUG] transcribing 071 - Echoes - Afinity Remix using model 'large-v3-turbo'
[DEBUG] loading model 'large-v3-turbo'
[DEBUG] transcribing with model 'large-v3-turbo'
[DEBUG] raw transcription: 378 tokens, duration ~213.40s, 82 segments
[DEBUG] cleaned transcription: 325 tokens
[DEBUG] model 'large-v3-turbo' produced 325 tokens, duration ~213.40s
[DEBUG] saved transcript and timestamps for 071 - Echoes - Afinity Remix (large-v3-turbo) in .logs
[DEBUG] accepted anchors: 8, score_sum: 7.891
[DEB