# Rhythmodoro: Vibe-aware playlist retrieval

- Connects to Milvus and loads the `embedded_music_data` collection.
- Shows vibe counts.
- Configures retrieval and helpers (LLM time estimate, search, time-accurate capping).
- Defines `run_playlist()` to execute end-to-end.
- At the bottom, set your inputs and run to get:
  - "This task should take N songs to complete!"
  - Estimated task duration
  - Suggested songs with individual durations
  - Sum of durations

In [104]:
from pymilvus import connections, Collection

connections.connect(alias="default", host="localhost", port="19530")

In [105]:
collection = Collection("embedded_music_data")

In [106]:
collection.load()

In [107]:
# Retrieval configuration and helpers (shared by run_playlist)
# Use a task description -> estimate minutes with local LLM (Qwen)
import os, json, re, requests

# Global toggles and defaults used by run_playlist
use_task_time = True                 # enable LLM-based time estimation
time_budget_mode = True              # cap by cumulative duration when we have a minutes estimate
budget_overshoot_ms = 15_000         # tolerate slight overshoot to avoid underfilling
max_playlist_songs = 50              # safety cap on playlist length

# Fallbacks when no task time available
result_limit = 10                    # default song count if no estimate
blended_max_ratio = 0.4              # allow up to 40% 'Blended' in results
fetch_multiplier = 5                 # oversample candidates for post-filtering

# Local LLM (Ollama) configuration
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen2.5:3b-instruct")

def ollama_up(base: str) -> bool:
    try:
        r = requests.get(f"{base}/api/tags", timeout=3)
        return r.status_code == 200
    except Exception:
        return False

def ollama_generate(prompt: str, model: str, expect_json: bool = False, num_predict: int = 64) -> str:
    url = f"{OLLAMA_HOST}/api/generate"
    payload = {
        "model": model,
        "prompt": prompt,
        "stream": False,
        "options": {"temperature": 0.2, "num_predict": num_predict}
    }
    if expect_json:
        payload["format"] = "json"
    r = requests.post(url, json=payload, timeout=45)
    r.raise_for_status()
    data = r.json()
    return (data.get("response") or "").strip()

def estimate_minutes_from_task(desc: str) -> int:
    if not desc or not desc.strip():
        return 0
    prompt = (
        "Estimate the total time required for this task in minutes. "
        "Return ONLY valid JSON like {\"minutes\": 45}.\nTask: " + desc.strip()
    )
    try:
        text = ollama_generate(prompt, OLLAMA_MODEL, expect_json=True, num_predict=64)
        # Parse JSON
        try:
            obj = json.loads(text)
        except Exception:
            # Try to extract JSON object if wrapped
            m = re.search(r"\{[^\}]*\}", text)
            obj = json.loads(m.group(0)) if m else {}
        minutes = int(float(obj.get("minutes", 0))) if isinstance(obj, dict) else 0
        return max(0, minutes)
    except Exception:
        return 0

def sample_avg_song_minutes(sample_size: int = 5000) -> float:
    try:
        MAX_QUERY_WINDOW = 16384
        lim = min(sample_size, MAX_QUERY_WINDOW)
        rows = collection.query(expr="duration_ms >= 0", output_fields=["duration_ms"], limit=lim)
        if not rows:
            return 3.5
        vals = [r.get("duration_ms") for r in rows if r.get("duration_ms") is not None]
        if not vals:
            return 3.5
        avg_ms = sum(vals) / len(vals)
        return max(1e-6, avg_ms / 60000.0)
    except Exception:
        return 3.5

Using filter expr: vibe in ["Groove", "Blended"]
{'target_vibe': 'Groove', 'include_blended': True, 'result_limit': 7, 'blended_max_ratio': 0.4, 'fetch_multiplier': 5, 'task_description': 'Clean living room', 'minutes_estimate': 30, 'avg_song_minutes': 3.879618376666667, 'playlist_count': 7, 'time_budget_mode': True, 'budget_overshoot_ms': 15000}


In [108]:
# Seed selection and vibe-aware retrieval with blended cap
from typing import List, Optional, Dict, Any

# Helper to get one entity by expr safely
def get_one(expr: str):
    try:
        res = collection.query(expr=expr, output_fields=["embedding", "vibe", "name", "artists"], limit=1)
        return res[0] if res else None
    except Exception:
        return None

# Utility to format milliseconds as M:SS
def format_ms(ms: int) -> str:
    try:
        total_sec = int(round(ms / 1000))
        m, s = divmod(total_sec, 60)
        return f"{m}:{s:02d}"
    except Exception:
        return "0:00"

# Normalize artists to a list of strings, handling strings, JSON strings, or lists
def normalize_artists(a) -> List[str]:
    if a is None:
        return []
    if isinstance(a, list):
        return [str(x) for x in a]
    if isinstance(a, str):
        s = a.strip()
        # Try parsing JSON-like lists
        if (s.startswith("[") and s.endswith("]")) or (s.startswith("{") and s.endswith("}")):
            try:
                obj = json.loads(s)
                if isinstance(obj, list):
                    return [str(x) for x in obj]
            except Exception:
                pass
        # Fallback: comma-separated
        if "," in s:
            parts = [p.strip() for p in s.split(",") if p.strip()]
            if parts:
                return parts
        return [s]
    # Any other type
    return [str(a)]

# Primary artist for a hit (first artist if available)
def _primary_artist(hit) -> str:
    arts = normalize_artists(hit.entity.get('artists'))
    return arts[0] if arts else ""

# Reorder list so the same primary artist doesn't appear consecutively
def reorder_no_adjacent_same_artist(hits: List) -> List:
    pending = list(hits)
    out: List = []
    while pending:
        if not out:
            out.append(pending.pop(0))
            continue
        last = _primary_artist(out[-1])
        idx = None
        for i, h in enumerate(pending):
            if _primary_artist(h) != last:
                idx = i
                break
        if idx is None:
            # Cannot avoid; append next best
            out.append(pending.pop(0))
        else:
            out.append(pending.pop(idx))
    return out

# Main workflow: builds filter, selects seed, searches, applies blended cap and time-accurate capping
# Returns a dict with details and also prints a friendly summary.
def run_playlist(
    task_description: Optional[str],
    target_vibe: Optional[str],
    *,
    include_blended: bool = True,
    time_budget_mode: bool = True,
    blended_max_ratio: float = 0.4,
    fetch_multiplier: int = 5,
    max_playlist_songs: int = 50,
    budget_overshoot_ms: int = 15_000,
    default_count: int = 10,
    seed_song_name: Optional[str] = None,
    avoid_adjacent_same_artist: bool = True,
    verbose: bool = False,
) -> Dict[str, Any]:
    # Compute minutes estimate and average song length
    minutes_estimate = 0
    avg_song_minutes = None
    if task_description and use_task_time and ollama_up(OLLAMA_HOST):
        minutes_estimate = estimate_minutes_from_task(task_description) or 0
        if minutes_estimate > 0:
            avg_song_minutes = sample_avg_song_minutes()

    # Recommend a count based on minutes/average (for messaging, fetch sizing, and blended cap)
    if minutes_estimate > 0 and avg_song_minutes:
        recommended_count = int(max(1, min(max_playlist_songs, round(minutes_estimate / max(1e-6, avg_song_minutes)))))
    else:
        recommended_count = default_count

    # Build filter expression
    filter_expr = None
    if target_vibe and isinstance(target_vibe, str) and target_vibe.strip():
        filter_expr = f'vibe in ["{target_vibe}", "Blended"]' if include_blended else f'vibe == "{target_vibe}"'

    # Seed selection
    seed_row = None
    if seed_song_name:
        seed_row = get_one(f'name == "{seed_song_name}"')
    else:
        if target_vibe:
            seed_row = get_one(f'vibe == "{target_vibe}"')
        if not seed_row:
            seed_row = get_one('vibe != "" and vibe != "Blended"') or get_one('vibe != ""')
    if not seed_row:
        raise ValueError("No seed candidate found. Check that the collection has 'vibe' populated.")

    if verbose:
        print(f"Using seed: {seed_row['name']} | Vibe={seed_row.get('vibe')}")

    query_embedding = seed_row["embedding"]

    # Prepare search params and fetch a larger pool
    search_params = {"metric_type": "L2", "params": {"ef": 64}}
    MAX_QUERY_WINDOW = 16384
    fetch_k = max(recommended_count * fetch_multiplier, recommended_count)
    fetch_k = min(fetch_k, MAX_QUERY_WINDOW)

    raw_results = collection.search(
        data=[query_embedding],
        anns_field="embedding",
        param=search_params,
        limit=fetch_k,
        output_fields=["id", "name", "artists", "vibe", "duration_ms", "embedding_json"],
        expr=filter_expr
    )[0]

    # Post-process: blended cap and capping strategy
    max_blended = int(round(blended_max_ratio * recommended_count)) if include_blended else 0
    final: List = []
    blended_used = 0

    use_time_budget = time_budget_mode and (minutes_estimate or 0) > 0
    if use_time_budget:
        target_ms = int(minutes_estimate * 60_000)
        running_ms = 0
        for hit in raw_results:
            this_vibe = hit.entity.get('vibe') or ''
            is_blended = (this_vibe == 'Blended')
            if is_blended and blended_used >= max_blended:
                continue
            dur = hit.entity.get('duration_ms')
            if not isinstance(dur, (int, float)) or dur <= 0:
                dur = int(3.5 * 60_000)
            if running_ms + dur > target_ms + budget_overshoot_ms:
                break
            final.append(hit)
            running_ms += dur
            if is_blended:
                blended_used += 1
            if len(final) >= max_playlist_songs:
                break
        # Ensure at least one if everything was too long but within overshoot
        if not final and len(raw_results) > 0:
            top = raw_results[0]
            top_dur = top.entity.get('duration_ms') or int(3.5 * 60_000)
            if top_dur <= target_ms + budget_overshoot_ms:
                final = [top]
    else:
        for hit in raw_results:
            this_vibe = hit.entity.get('vibe') or ''
            is_blended = (this_vibe == 'Blended')
            if is_blended and blended_used >= max_blended:
                continue
            final.append(hit)
            if is_blended:
                blended_used += 1
            if len(final) >= recommended_count:
                break

    # Reorder to avoid adjacent same artist if requested
    if avoid_adjacent_same_artist and len(final) > 1:
        final = reorder_no_adjacent_same_artist(final)

    # Summaries
    total_ms = sum((h.entity.get('duration_ms') or int(3.5 * 60_000)) for h in final)
    selected_count = len(final)

    # Friendly outputs: always report selected_count
    print(f"This task should take {selected_count} songs to complete!")
    if minutes_estimate > 0 and avg_song_minutes:
        print(f"Estimated task duration: {minutes_estimate} minutes (avg song ~ {avg_song_minutes:.2f} min)")
    else:
        print("No task time estimate available; using default count.")

    print(f"Selected {selected_count} songs | Filled duration: {format_ms(total_ms)} ({total_ms} ms)")
    if target_vibe:
        print(f"Vibe filter: {target_vibe} (include_blended={include_blended}, blended_used={blended_used})")

    # Detailed list
    for idx, hit in enumerate(final, 1):
        dur = hit.entity.get('duration_ms') or int(3.5 * 60_000)
        artists_list = normalize_artists(hit.entity.get('artists'))
        print(f"{idx:2d}. {hit.entity.get('name')} — {', '.join(artists_list)} | {hit.entity.get('vibe')} | {format_ms(dur)}")

    return {
        "songs": [
            {
                "name": h.entity.get('name'),
                "artists": normalize_artists(h.entity.get('artists')),
                "vibe": h.entity.get('vibe'),
                "duration_ms": h.entity.get('duration_ms') or int(3.5 * 60_000),
                "score": h.score,
            }
            for h in final
        ],
        "total_ms": total_ms,
        "selected_count": selected_count,
        "recommended_count": recommended_count,
        "minutes_estimate": minutes_estimate,
        "avg_song_minutes": avg_song_minutes,
        "target_vibe": target_vibe,
        "include_blended": include_blended,
        "avoid_adjacent_same_artist": avoid_adjacent_same_artist,
    }

In [109]:
# Inputs: set your task and vibe here, then run the next cell
USER_task_description = "45-minute commute with upbeat energy"
USER_target_vibe = "Upbeat"  # e.g., "Chill", "Pop", "Dance", "Acoustic", "Groove", "Lofi", "Soft Haze", "Pump Up", "Midnight Blues"
USER_include_blended = True
USER_time_budget_mode = True
USER_avoid_adjacent_same_artist = True
USER_verbose = False

print("Inputs ready. Edit USER_* variables and run the next cell to generate the playlist.")

Inputs ready. Edit USER_* variables and run the next cell to generate the playlist.


In [110]:
# Run the workflow using the inputs above
res = run_playlist(
    task_description=USER_task_description,
    target_vibe=USER_target_vibe,
    include_blended=USER_include_blended,
    time_budget_mode=USER_time_budget_mode,
    blended_max_ratio=blended_max_ratio,
    fetch_multiplier=fetch_multiplier,
    max_playlist_songs=max_playlist_songs,
    budget_overshoot_ms=budget_overshoot_ms,
    default_count=result_limit,
    avoid_adjacent_same_artist=USER_avoid_adjacent_same_artist,
    verbose=USER_verbose,
)

# Recap: Explicit success line and totals
print("\n=== Summary ===")
print(f"This task should take {res.get('selected_count')} songs to complete!")
print(f"Estimated task duration: {res.get('minutes_estimate') or 0} minutes")
ms = int(res.get('total_ms') or 0)
mins, secs = divmod(round(ms / 1000), 60)
hours, mins = divmod(mins, 60)
print(f"Playlist suggestions: {res.get('selected_count')} songs, total duration {hours:02d}:{mins:02d}:{secs:02d} ({ms} ms)")

This task should take 9 songs to complete!
Estimated task duration: 45 minutes (avg song ~ 3.88 min)
Selected 9 songs | Filled duration: 43:44 (2624148 ms)
Vibe filter: Upbeat (include_blended=True, blended_used=0)
 1. Finding Her — ['Boz Scaggs'] | Upbeat | 3:58
 2. Where My Girls At — ['702'] | Upbeat | 2:47
 3. Georgia — ['Boz Scaggs'] | Upbeat | 3:55
 4. What Once Was — Her's | Upbeat | 4:15
 5. Loan Me a Dime — ['Boz Scaggs'] | Upbeat | 12:32
 6. High You Are — ['What So Not', 'Branchez'] | Upbeat | 3:33
 7. Finding My Way — ['Rush'] | Upbeat | 5:06
 8. What You See — ['Oingo Boingo'] | Upbeat | 3:55
 9. Little Girls — ['Oingo Boingo'] | Upbeat | 3:44

=== Summary ===
This task should take 9 songs to complete!
Estimated task duration: 45 minutes
Playlist suggestions: 9 songs, total duration 00:43:44 (2624148 ms)
