In [1]:
import tempfile
import os
import whisperx
import re
import difflib

# Install: pip install demucs
import torch
import torchaudio
from demucs.pretrained import get_model
from demucs.apply import apply_model

song_path = 'conviction.wav'
lyrics_text = """
Conviction in a couple different colors
Shoot a cannon at yourself
Or it's a credo that you holler
"""

# Options
use_vocal_separation = True
whisperx_model_size = "base.en"
device = "cpu"
short_word_bias = 0.175  # 0.0 = no bias (pure ASR durations); higher shrinks short words / extends long words within phrase


def extract_vocals(audio_path):
    """Separate vocals using Demucs (no DLL issues on Windows)"""
    print("Separating vocals from music with Demucs...")
    
    # Load model
    model = get_model('htdemucs')
    model.cpu()
    model.eval()
    
    # Load audio
    wav, sr = torchaudio.load(audio_path)
    
    # Apply separation
    with torch.no_grad():
        sources = apply_model(model, wav[None], device='cpu')[0]
    
    # Extract vocals (index 3)
    vocals = sources[3]
    
    # Save to temp file - FIXED: Use NamedTemporaryFile instead of mktemp
    with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp:
        temp_vocals = tmp.name
    
    torchaudio.save(temp_vocals, vocals, sr)

    
    print(f"Vocals extracted to: {temp_vocals}")
    return temp_vocals


def _tokenize_words(text: str):
    # keep letters and apostrophes for words like it's, you're
    return re.findall(r"[A-Za-z']+", text.lower())


def _split_lyrics_into_phrases(lyrics_text: str):
    # Split by lines; drop empties
    lines = [ln.strip() for ln in lyrics_text.splitlines()]
    lines = [ln for ln in lines if ln]
    phrases = []
    for ln in lines:
        words = _tokenize_words(ln)
        if words:
            phrases.append({"text": ln, "words": words})
    return phrases


def _group_asr_into_phrases(whisperx_words, gap_threshold: float = 0.2):
    # Group contiguous ASR words separated by gaps > threshold
    phrases = []
    if not whisperx_words:
        return phrases
    current = {"words": [], "start": whisperx_words[0]['start'], "end": whisperx_words[0]['end']}
    for w in whisperx_words:
        if not current["words"]:
            current["words"].append(w)
            current["start"] = w["start"]
            current["end"] = w["end"]
            continue
        gap = w["start"] - current["words"][-1]["end"]
        if gap > gap_threshold:
            phrases.append(current)
            current = {"words": [w], "start": w["start"], "end": w["end"]}
        else:
            current["words"].append(w)
            current["end"] = w["end"]
    if current["words"]:
        phrases.append(current)
    return phrases


def _norm(s: str) -> str:
    return re.sub(r"[^a-z']+", "", s.lower())


def _sim(a: str, b: str) -> float:
    return difflib.SequenceMatcher(None, a, b).ratio()


def _segment_asr_groups(lyric_words, asr_words_in_phrase):
    """
    Greedily group contiguous ASR tokens to each lyric word.
    Returns list of dicts: {word, base_start, base_end, len_norm}
    """
    groups = []
    awords = asr_words_in_phrase
    j = 0
    for lw in lyric_words:
        lw_norm = _norm(lw)
        if j >= len(awords):
            # no ASR left; estimate tiny duration after previous
            if groups:
                base_start = groups[-1]['base_end']
            else:
                base_start = awords[-1]['end'] if awords else 0.0
            base_end = base_start + 0.3
            groups.append({"word": lw, "base_start": base_start, "base_end": base_end, "len_norm": len(lw_norm) or 1})
            continue
        g_start = j
        g_end = j
        current_concat = _norm(awords[g_end]['word'])
        best_sim = _sim(current_concat, lw_norm) if lw_norm else 1.0
        best_end = g_end
        while g_end + 1 < len(awords):
            trial = current_concat + _norm(awords[g_end + 1]['word'])
            trial_sim = _sim(trial, lw_norm) if lw_norm else 1.0
            len_ratio = len(trial) / max(1, len(lw_norm))
            if trial_sim >= best_sim or (best_sim < 0.6 and len_ratio < 1.3):
                g_end += 1
                current_concat = trial
                best_sim = trial_sim
                best_end = g_end
            else:
                break
        base_start = float(awords[g_start]['start'])
        base_end = float(awords[best_end]['end'])
        groups.append({"word": lw, "base_start": base_start, "base_end": base_end, "len_norm": len(lw_norm) or 1})
        j = best_end + 1
    # If leftover ASR tokens remain, extend last group's end to include them
    if j < len(awords) and groups:
        groups[-1]['base_end'] = float(awords[-1]['end'])
    return groups


def _apply_duration_bias(groups, phrase_start, phrase_end, bias: float):
    """
    Adjust durations per lyric word around their ASR-based base durations using a
    length-based bias. Preserves total phrase duration and ordering.
    bias in [0,1]: 0 = no change; higher = stronger long>short emphasis.
    """
    if not groups:
        return []
    # Base durations from ASR grouping
    base_durs = [max(0.01, g['base_end'] - g['base_start']) for g in groups]
    total_base = sum(base_durs)
    if total_base <= 1e-6:
        # fallback: equal tiny splits
        step = (phrase_end - phrase_start) / len(groups)
        t = phrase_start
        out = []
        for g in groups:
            out.append({"word": g['word'], "start": t, "end": t + step})
            t += step
        out[-1]['end'] = phrase_end
        return out

    # Compute weights based on word length relative to mean
    mean_len = max(1.0, sum(g['len_norm'] for g in groups) / len(groups))
    weights = []
    for g in groups:
        ln = max(1.0, float(g['len_norm']))
        rel = ln / mean_len
        # weight factor: 1 blended toward rel**gamma by bias
        gamma = 1.0 + 1.0 * bias
        w = (1 - bias) + bias * (rel ** gamma)
        weights.append(w)

    # Apply weights to base durations, then renormalize to keep total the same
    adjusted = [d * w for d, w in zip(base_durs, weights)]
    sum_adj = sum(adjusted)
    if sum_adj <= 1e-6:
        adjusted = base_durs[:]  # fallback
        sum_adj = total_base
    scale = total_base / sum_adj
    adjusted = [a * scale for a in adjusted]

    # Lay out sequentially within the phrase window, preserving total window
    # Align the start to min(base_starts, phrase_start) for stability
    t = max(phrase_start, min(g['base_start'] for g in groups))
    # If the first ASR starts after phrase_start, use that; otherwise phrase_start
    t = phrase_start
    out = []
    for g, dur in zip(groups, adjusted):
        start = t
        end = min(phrase_end, start + dur)
        out.append({"word": g['word'], "start": start, "end": end})
        t = end
    # ensure phrase end exact
    if out:
        out[-1]['end'] = phrase_end
    return out


def _segment_asr_to_lyrics_with_bias(lyrics_phrases, asr_phrases, bias: float):
    timings = []
    pairs = min(len(lyrics_phrases), len(asr_phrases))
    for i in range(pairs):
        lp = lyrics_phrases[i]
        ap = asr_phrases[i]
        lwords = lp["words"]
        awords = ap["words"]
        s, e = float(ap['start']), float(ap['end'])
        if not lwords or e <= s:
            continue
        groups = _segment_asr_groups(lwords, awords)
        timings.extend(_apply_duration_bias(groups, s, e, bias))
    # handle extra lyric phrases if any
    if len(lyrics_phrases) > pairs:
        t = timings[-1]['end'] if timings else (asr_phrases[-1]['end'] if asr_phrases else 0.0)
        for i in range(pairs, len(lyrics_phrases)):
            for w in lyrics_phrases[i]['words']:
                end = t + 0.3
                timings.append({"word": w, "start": t, "end": end})
                t = end
    return timings


def align_lyrics_to_audio(audio_path, lyrics_text, use_vocal_separation=True):
    """
    Align YOUR lyrics to the audio timing using WhisperX: first group ASR tokens to lyric
    words (preserving ASR-based durations), then apply a short-word bias that slightly
    reduces durations of short/function words and increases durations of longer words,
    while preserving each phrase's total duration.

    Returns a list of word timings for YOUR lyrics.
    """
    # Step 1: Extract vocals if enabled
    if use_vocal_separation:
        audio_to_use = extract_vocals(audio_path)
    else:
        audio_to_use = audio_path
    
    # Step 2: Load WhisperX model
    print(f"Loading WhisperX model ({whisperx_model_size})...")
    model = whisperx.load_model(whisperx_model_size, device, compute_type="float32")
    
    # Step 3: Load audio
    audio = whisperx.load_audio(audio_to_use)
    
    # Step 4: Transcribe to get initial segments (disable VAD for music)
    print("Transcribing audio...")
    result = model.transcribe(audio, batch_size=16, language="en")
    
    # Step 5: Load alignment model for word-level timestamps
    print("Loading alignment model...")
    model_a, metadata = whisperx.load_align_model(language_code="en", device=device)
    
    # Step 6: Align to get precise word timings
    print("Aligning words...")
    result_aligned = whisperx.align(
        result["segments"], 
        model_a, 
        metadata, 
        audio, 
        device,
        return_char_alignments=False
    )
    
    # Step 7: Extract word timings from WhisperX
    whisperx_words = []
    for segment in result_aligned.get("segments", []):
        for word_info in segment.get("words", []):
            if word_info.get('word') is None:
                continue
            whisperx_words.append({
                'word': word_info['word'].strip().lower(),
                'start': float(word_info['start']),
                'end': float(word_info['end'])
            })
    print(f"Extracted {len(whisperx_words)} words from WhisperX")
    # Optional debug (first 50)
    for idx, w in enumerate(whisperx_words[:50]):
        print(f"{idx}: '{w['word']}' - {w['start']:.2f}s to {w['end']:.2f}s")

    # Step 8: Build lyric phrases (lines)
    lyric_phrases = _split_lyrics_into_phrases(lyrics_text)
    if not lyric_phrases:
        raise ValueError("No lyric phrases found. Provide non-empty lyrics_text.")

    # Step 9: Group ASR words into phrases by time gaps
    asr_phrases = _group_asr_into_phrases(whisperx_words, gap_threshold=0.2)
    if not asr_phrases and whisperx_words:
        asr_phrases = [{"words": whisperx_words, "start": whisperx_words[0]['start'], "end": whisperx_words[-1]['end'] }]

    # Step 10: Segment & bias-adjust using ASR base durations
    word_timings = _segment_asr_to_lyrics_with_bias(lyric_phrases, asr_phrases, short_word_bias)

    print(f"\nAligned {len(word_timings)} lyric words using ASR duration + short-word bias (bias={short_word_bias}).")
    return word_timings


# Run the alignment
word_timings = align_lyrics_to_audio(song_path, lyrics_text, use_vocal_separation)

# Display results
print("\nYour lyrics with timings:")
for i, word_data in enumerate(word_timings):
    print(f"{i}: '{word_data['word']}' - {word_data['start']:.2f}s to {word_data['end']:.2f}s")


Separating vocals from music with Demucs...




Vocals extracted to: C:\Users\marcu\AppData\Local\Temp\tmpf9u7vh97.wav
Loading WhisperX model (base.en)...


  import pkg_resources
  from .autonotebook import tqdm as notebook_tqdm


SystemError: initialization of _internal failed without raising an exception

In [None]:
import cv2
import numpy as np
import math
from pathlib import Path
import subprocess

# Inputs
output_dir = Path("out")
output_dir.mkdir(parents=True, exist_ok=True)
video_fps = 30
width, height = 1920, 1080

# Font configuration
# If font_path points to a valid .ttf/.otf, we will use PIL TrueType at font_size_px.
# Otherwise we fall back to OpenCV Hershey using hershey_scale.
font_path = r"KGRedHands.ttf"  # set to None or to a valid TTF path
font_size_px = 120  # applies when using TrueType font via PIL
hershey_scale = 2.5  # applies when using OpenCV Hershey fonts

font_color = (255, 255, 255)
font_thickness = 3
stroke_color = (0, 0, 0)
stroke_thickness = 6
line_type = cv2.LINE_AA

# Source audio and timings from previous cells
source_audio = song_path  # from earlier cell
word_timings_in = word_timings  # from earlier cell (list of dicts: word,start,end)

# Video temp paths
temp_video_path = output_dir / "lyrics_black_temp.mp4"
final_video_path = output_dir / "lyrics_black_with_audio.mp4"

# Prepare VideoWriter (H264 via mp4v/avc1 depends on platform; fallback to MJPG)
fourcc = cv2.VideoWriter.fourcc(*"mp4v")
writer = cv2.VideoWriter(str(temp_video_path), fourcc, video_fps, (width, height))

# Decide which font pipeline to use
use_pil_font = bool(font_path) and Path(font_path).exists()
if not use_pil_font and font_path:
    print(f"Warning: font_path not found: {font_path}. Falling back to OpenCV Hershey font.")

# Helper to draw centered text with optional stroke
def draw_centered_text(img, text):
    if not use_pil_font:
        # OpenCV Hershey path (uses hershey_scale)
        font = cv2.FONT_HERSHEY_SIMPLEX
        text_size, _ = cv2.getTextSize(text, font, hershey_scale, font_thickness)
        tw, th = text_size
        x = (width - tw) // 2
        y = (height + th) // 2
        if stroke_thickness > 0:
            cv2.putText(img, text, (x, y), font, hershey_scale, stroke_color, stroke_thickness, line_type)
        cv2.putText(img, text, (x, y), font, hershey_scale, font_color, font_thickness, line_type)
    else:
        # PIL TrueType path (uses font_size_px)
        from PIL import Image, ImageDraw, ImageFont
        pil_img = Image.fromarray(img)
        draw = ImageDraw.Draw(pil_img)
        try:
            fnt = ImageFont.truetype(font_path, font_size_px)
        except Exception as e:
            print(f"Failed to load TrueType font '{font_path}': {e}. Falling back to Hershey.")
            # Fallback to Hershey immediately
            font = cv2.FONT_HERSHEY_SIMPLEX
            text_size, _ = cv2.getTextSize(text, font, hershey_scale, font_thickness)
            tw, th = text_size
            x = (width - tw) // 2
            y = (height + th) // 2
            if stroke_thickness > 0:
                cv2.putText(img, text, (x, y), font, hershey_scale, stroke_color, stroke_thickness, line_type)
            cv2.putText(img, text, (x, y), font, hershey_scale, font_color, font_thickness, line_type)
            return
        # Measure text
        bbox = draw.textbbox((0, 0), text, font=fnt)
        tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
        x = (width - tw) // 2
        y = (height - th) // 2
        # crude stroke by drawing offset shadows
        if stroke_thickness > 0:
            r = max(1, stroke_thickness // 2)
            for dx, dy in [(-r,0),(r,0),(0,-r),(0,r),(-r,-r),(-r,r),(r,-r),(r,r)]:
                draw.text((x+dx, y+dy), text, font=fnt, fill=stroke_color)
        draw.text((x, y), text, font=fnt, fill=font_color)
        img[:] = np.array(pil_img)

# Normalize/clean timings
def clamp(v, lo, hi):
    return max(lo, min(hi, v))

valid_timings = []
for w in word_timings_in:
    try:
        s = float(w.get('start', 0.0))
        e = float(w.get('end', s + 0.25))
        if e <= s:
            e = s + 0.25
        valid_timings.append({'word': str(w.get('word','')), 'start': s, 'end': e})
    except Exception:
        pass

if not valid_timings:
    raise RuntimeError(
        "word_timings is empty. Please ensure you have:\n"
        "1. Run the word alignment cell that populates 'word_timings'\n"
        "2. Verified that the alignment completed successfully\n"
        f"Current word_timings length: {len(word_timings_in)}"
    )

total_duration = valid_timings[-1]['end']

def frame_for_time(t):
    # Generate a single frame for time t
    # Find the active word whose [start,end) contains t
    active = None
    for w in valid_timings:
        if w['start'] <= t < w['end']:
            active = w
            break
    img = np.zeros((height, width, 3), dtype=np.uint8)
    if active:
        draw_centered_text(img, active['word'])
    return img

# Render frames
num_frames = int(math.ceil(total_duration * video_fps))
for i in range(num_frames):
    t = i / video_fps
    frame = frame_for_time(t)
    writer.write(frame)

writer.release()
print(f"Wrote silent video: {temp_video_path}")

# Mux audio using ffmpeg
# - Copy video stream, re-encode audio to AAC for compatibility
# - Shorten to the shorter of video/audio so they end together
ffmpeg_cmd = [
    "ffmpeg",
    "-y",
    "-i", str(temp_video_path),
    "-i", str(source_audio),
    "-c:v", "copy",
    "-c:a", "aac",
    "-shortest",
    str(final_video_path)
]
print("Running:", " ".join(ffmpeg_cmd))
try:
    subprocess.run(ffmpeg_cmd, check=True)
    print(f"Wrote final video with audio: {final_video_path}")
except FileNotFoundError:
    print("ffmpeg not found on PATH. Please install FFmpeg or add it to PATH.")

Wrote silent video: out\lyrics_black_temp.mp4
Running: ffmpeg -y -i out\lyrics_black_temp.mp4 -i conviction.wav -c:v copy -c:a aac -shortest out\lyrics_black_with_audio.mp4
Wrote final video with audio: out\lyrics_black_with_audio.mp4
Wrote final video with audio: out\lyrics_black_with_audio.mp4
