## 0. Setup
Import the Python modules used throughout the notebook. Make sure you have already installed the packages listed in the README (pandas, numpy, mutagen, unidecode).

### Package bootstrap
Install any missing Python packages required by this workflow so the import cell succeeds even on a fresh environment.

In [None]:
import importlib
import subprocess
import sys

REQUIRED_PACKAGES = {
    "pandas": "pandas",
    "numpy": "numpy",
    "mutagen": "mutagen",
    "unidecode": "Unidecode"
}

for module_name, install_name in REQUIRED_PACKAGES.items():
    try:
        importlib.import_module(module_name)
    except ImportError:
        print(f"Installing missing dependency: {install_name}")
        subprocess.check_call([sys.executable, "-m", "pip", "install", install_name])
print("Dependency check complete.")

In [None]:
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Iterable, List, Optional

import math
import re
import pandas as pd
import numpy as np
from mutagen import File as MutagenFile
from unidecode import unidecode

## 1. Configuration
Define key directories (relative to the repository root) and ensure the output folder for reports exists.

In [None]:
# Adjust these paths if you relocate folders.
REPO_ROOT = Path.cwd()
DOWNLOAD_ROOT = REPO_ROOT / "Downloaded"
SPOTIFY_PLAYLISTS = REPO_ROOT / "spotify_playlists"
SHOPPING_LIST_DIR = REPO_ROOT / "shopping_lists"
LIBRARY_INDEX_CSV = REPO_ROOT / "library_index.csv"

SHOPPING_LIST_DIR.mkdir(exist_ok=True)
print(f"Repository root: {REPO_ROOT}")
print(f"Download library: {DOWNLOAD_ROOT}")
print(f"Spotify playlist CSVs: {SPOTIFY_PLAYLISTS}")
print(f"Shopping/output directory: {SHOPPING_LIST_DIR}")

## 2. Helper functions
Canonicalization helpers keep matching consistent between Spotify exports and local audio metadata.

In [None]:
NON_ALNUM = re.compile(r"[^a-z0-9]+")
FEAT_PATTERN = re.compile(r"\(feat\..*?\)", re.IGNORECASE)
REMIX_PATTERN = re.compile(r"-\s*(remaster(ed)?|remix|edit|mix).*", re.IGNORECASE)
AUDIO_EXTENSIONS = {'.mp3', '.flac', '.m4a', '.aac', '.ogg', '.wav', '.aiff'}

def canonicalize_string(value: Optional[str]) -> str:
    if not value:
        return ""
    normalized = unidecode(str(value))
    normalized = FEAT_PATTERN.sub("", normalized)
    normalized = REMIX_PATTERN.sub("", normalized)
    normalized = normalized.lower()
    normalized = NON_ALNUM.sub(" ", normalized)
    normalized = normalized.strip()
    return re.sub(r"\s+", " ", normalized)

def primary_artist(artists_field: Optional[str]) -> str:
    if not artists_field or not isinstance(artists_field, str):
        return ""
    first = artists_field.split(';')[0]
    return first.strip()

def friendly_playlist_name(csv_path: Path) -> str:
    name = csv_path.stem.replace('_', ' ')
    return name.strip()

def duration_ms_from_audio(audio_obj) -> Optional[int]:
    if audio_obj and audio_obj.info and getattr(audio_obj.info, 'length', None):
        return int(round(audio_obj.info.length * 1000))
    return None

## 3. Scan the downloaded library
Create or refresh an auditable `library_index.csv` capturing metadata for every audio file under `Downloaded/`.

In [None]:
def scan_downloaded_library(download_root: Path) -> pd.DataFrame:
    records = []
    if not download_root.exists():
        print(f"Download directory not found: {download_root}")
        return pd.DataFrame()

    for file_path in download_root.rglob('*'):
        if not file_path.is_file() or file_path.suffix.lower() not in AUDIO_EXTENSIONS:
            continue
        try:
            audio = MutagenFile(file_path)
        except Exception as exc:
            print(f"Failed to read {file_path}: {exc}")
            audio = None

        tags = getattr(audio, 'tags', None) if audio else None
        artist_tag = None
        title_tag = None
        album_tag = None

        if tags:
            artist_tag = tags.get('TPE1') or tags.get('artist')
            title_tag = tags.get('TIT2') or tags.get('title')
            album_tag = tags.get('TALB') or tags.get('album')

        artist_str = str(artist_tag.text[0]) if hasattr(artist_tag, 'text') else (artist_tag if isinstance(artist_tag, str) else None)
        title_str = str(title_tag.text[0]) if hasattr(title_tag, 'text') else (title_tag if isinstance(title_tag, str) else None)
        album_str = str(album_tag.text[0]) if hasattr(album_tag, 'text') else (album_tag if isinstance(album_tag, str) else None)

        # Fallbacks from the path structure
        if not artist_str:
            artist_str = file_path.parent.name
        if not title_str:
            title_str = file_path.stem

        records.append({
            'file_path': file_path.relative_to(download_root).as_posix(),
            'artist_raw': artist_str,
            'title_raw': title_str,
            'album_raw': album_str,
            'artist_canonical': canonicalize_string(artist_str),
            'title_canonical': canonicalize_string(title_str),
            'duration_ms': duration_ms_from_audio(audio)
        })

    df = pd.DataFrame.from_records(records)
    if not df.empty:
        df.sort_values(['artist_canonical', 'title_canonical', 'file_path'], inplace=True)
    return df

library_index = scan_downloaded_library(DOWNLOAD_ROOT)
print(f"Indexed {len(library_index):,} local tracks")
if not library_index.empty:
    library_index.to_csv(LIBRARY_INDEX_CSV, index=False)
    display(library_index.head())
else:
    print('Library index is empty – check DOWNLOAD_ROOT or file extensions.')

## 4. Load Spotify playlist exports
Combine all CSV files in `spotify_playlists/` into a single DataFrame with helpful flags.

In [None]:
def load_spotify_playlists(csv_root: Path) -> pd.DataFrame:
    rows = []
    if not csv_root.exists():
        print(f"Spotify playlist directory not found: {csv_root}")
        return pd.DataFrame()

    csv_files = sorted(csv_root.glob('*.csv'))
    if not csv_files:
        print(f"No CSV files found in {csv_root}")
        return pd.DataFrame()

    for csv_file in csv_files:
        try:
            df = pd.read_csv(csv_file)
        except Exception as exc:
            print(f"Failed to read {csv_file}: {exc}")
            continue
        df['playlist_name'] = friendly_playlist_name(csv_file)
        df['is_liked'] = csv_file.name.lower() == 'liked_songs.csv'
        df['is_top_songs'] = csv_file.name.lower().startswith('your_top_songs_')
        rows.append(df)

    merged = pd.concat(rows, ignore_index=True) if rows else pd.DataFrame()
    if merged.empty:
        return merged

    merged['primary_artist'] = merged['Artist Name(s)'].apply(primary_artist)
    merged['artist_canonical'] = merged['primary_artist'].apply(canonicalize_string)
    merged['title_canonical'] = merged['Track Name'].apply(canonicalize_string)
    return merged

spotify_df = load_spotify_playlists(SPOTIFY_PLAYLISTS)
print(f"Loaded {len(spotify_df):,} Spotify rows across {spotify_df['playlist_name'].nunique() if not spotify_df.empty else 0} playlists")
if not spotify_df.empty:
    display(spotify_df.head())

## 5. Match Spotify tracks to the local library
Left-join on canonical artist/title keys and filter by duration tolerance where available.

In [None]:
DURATION_TOLERANCE_MS = 3000

def match_tracks(spotify_df: pd.DataFrame, library_df: pd.DataFrame, duration_tolerance_ms: int = DURATION_TOLERANCE_MS) -> pd.DataFrame:
    if spotify_df.empty:
        return pd.DataFrame()
    if library_df.empty:
        result = spotify_df.copy()
        result['file_path'] = pd.NA
        result['duration_ms_local'] = pd.NA
        return result

    lib_cols = library_df.rename(columns={'duration_ms': 'duration_ms_local'})
    merged = spotify_df.merge(lib_cols, how='left', on=['artist_canonical', 'title_canonical'], suffixes=('_spotify', '_local'))

    if 'duration_ms_local' in merged.columns:
        mask = merged['duration_ms_local'].notna() & merged['Duration (ms)'].notna()
        mismatched = mask & (merged['Duration (ms)'] - merged['duration_ms_local']).abs() > duration_tolerance_ms
        merged.loc[mismatched, ['file_path', 'duration_ms_local']] = pd.NA
    return merged

matched_df = match_tracks(spotify_df, library_index)
print(f"Matched rows: {len(matched_df):,}")
if not matched_df.empty:
    have_files = matched_df['file_path'].notna().sum()
    print(f"Tracks already downloaded: {have_files:,}")
    print(f"Tracks missing locally: {len(matched_df) - have_files:,}")
    display(matched_df.head())

## 6. Generate a dated shopping list
Aggregate missing tracks across playlists and export a timestamped CSV in `shopping_lists/`.

In [None]:
def build_shopping_list(matched_df: pd.DataFrame) -> pd.DataFrame:
    if matched_df.empty:
        return pd.DataFrame()
    missing = matched_df[matched_df['file_path'].isna()].copy()
    if missing.empty:
        return pd.DataFrame()

    grouped = (
        missing.groupby(['artist_canonical', 'title_canonical'], as_index=False)
        .agg({
            'primary_artist': 'first',
            'Track Name': 'first',
            'Album Name': lambda col: col.dropna().iloc[0] if col.dropna().any() else pd.NA,
            'Duration (ms)': 'first',
            'playlist_name': lambda col: sorted(set(col)),
            'is_liked': 'any',
            'is_top_songs': 'any'
        })
    )
    grouped['Playlists_Count'] = grouped['playlist_name'].apply(len)
    grouped['Playlists'] = grouped['playlist_name'].apply(lambda names: '; '.join(names))
    grouped.rename(columns={
        'primary_artist': 'Artist',
        'Track Name': 'Title',
        'Album Name': 'Album',
        'Duration (ms)': 'Duration_ms',
        'is_liked': 'Is_Liked',
        'is_top_songs': 'Is_Top_Songs'
    }, inplace=True)
    columns = ['Artist', 'Title', 'Album', 'Duration_ms', 'Playlists_Count', 'Playlists', 'Is_Liked', 'Is_Top_Songs']
    grouped = grouped[columns]
    grouped.sort_values(['Playlists_Count', 'Is_Liked', 'Artist', 'Title'], ascending=[False, False, True, True], inplace=True)
    return grouped

shopping_df = build_shopping_list(matched_df)
if shopping_df.empty:
    print('All playlist tracks already exist locally – no shopping list generated.')
else:
    timestamp = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
    shopping_path = SHOPPING_LIST_DIR / f'shopping_list_{timestamp}.csv'
    shopping_df.to_csv(shopping_path, index=False)
    print(f"Shopping list saved to {shopping_path}")
    display(shopping_df.head())

## 7. Generate an orphaned-tracks list
Highlight tracks that exist in `Downloaded/` but are not referenced by any current playlist snapshot.

In [None]:
def build_orphaned_tracks(matched_df: pd.DataFrame, library_df: pd.DataFrame) -> pd.DataFrame:
    if library_df.empty:
        return pd.DataFrame()
    playlist_keys = set(zip(matched_df['artist_canonical'], matched_df['title_canonical'])) if not matched_df.empty else set()
    library_df = library_df.copy()
    library_df['key'] = list(zip(library_df['artist_canonical'], library_df['title_canonical']))
    mask = library_df['key'].apply(lambda key: key not in playlist_keys)
    orphaned = library_df[mask].copy()
    if orphaned.empty:
        return pd.DataFrame()
    orphaned.rename(columns={
        'artist_raw': 'Artist',
        'title_raw': 'Title',
        'album_raw': 'Album',
        'duration_ms': 'Duration_ms'
    }, inplace=True)
    columns = ['Artist', 'Title', 'Album', 'Duration_ms', 'file_path']
    return orphaned[columns]

orphan_df = build_orphaned_tracks(matched_df, library_index)
if orphan_df.empty:
    print('No orphaned tracks – every local track appears in at least one playlist snapshot.')
else:
    timestamp = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
    orphan_path = SHOPPING_LIST_DIR / f'orphaned_tracks_{timestamp}.csv'
    orphan_df.to_csv(orphan_path, index=False)
    print(f"Orphaned-track report saved to {orphan_path}")
    display(orphan_df.head())

## 8. Show Playlist Statistics
Summarize key statistics about each playlist, including total tracks, matched tracks, missing tracks, and orphaned tracks.

## 9. Generate Playlists
Build on these DataFrames to generate Innioasis Y1 playlist files (`.m3u8`) containing all the real tracks.