The goal of this notebook is to split the musicmap data (both audio files and features) into training, validation, and testing sets.

We'll target a 70-20-10 split.

Note that different songs have wildly different lengths. The shortest are under 1 minute, and others are over an hour.

We'd like to have a relatively uniform distribution of clips among genres and, ideally, songs within genre. So we've decided to limit the number of clips to 4 (randomly selected) per song. A few genres will be a bit short.

The code below will:
1. Recursively find all audio files in subdirectories (by genre).
2. Randomly split each genre into 70% train, 20% validation, 10% test.
3. Create a CSV listing the file path and its assigned split.
4. Copy files into new train/, validation/, and test/ folders while preserving the subdirectory (genre) structure.

First we'll generate the splits and create the CSV files

In [6]:
# generate_splits.py

import os
import random
import csv
from pathlib import Path
import subprocess
import numpy as np
import librosa



In [None]:
# === CONFIGURATION ===
source_dir = Path('Musicmap_Improved_Download')
output_base = Path('Musicmap_Dataset_Split')                 # Will contain train/val/test folders
segment_length = 30
audio_extensions = {'.mp3', '.wav', '.flac', '.m4a'}
split_ratios = {'train': 0.7, 'val': 0.2, 'test': 0.1}
max_segments_per_song = 4

# libaudio is deprecated in librosa, and ffmpeg is slow because it loads the entire audio file into memory
# using ffprobe instead
def get_duration_ffprobe(path):
    try:
        result = subprocess.run(
            ['ffprobe', '-v', 'error', '-show_entries',
             'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', str(path)],
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT
        )
        return float(result.stdout.strip())
    except Exception as e:
        print(f"ffprobe failed on {path}: {e}")
        return 0

#def get_duration_ffmpeg(path, sr=22050):
#    try:
#        command = [
#            'ffmpeg', '-i', str(path),
#            '-f', 'f32le', '-acodec', 'pcm_f32le',
#            '-ac', '1', '-ar', str(sr), '-loglevel', 'quiet', '-'
#        ]
#        out = subprocess.check_output(command)
#        y = np.frombuffer(out, dtype=np.float32)
#        return librosa.get_duration(y=y, sr=sr)
#    except Exception as e:
#        print(f"ffmpeg decode failed on {path}: {e}")
#        return 0

# === STEP 1: Gather files by genre ===
genre_to_files = {}
for genre_dir in source_dir.iterdir():
    if genre_dir.is_dir():
        files = [f for f in genre_dir.glob('*') if f.suffix.lower() in audio_extensions]
        if files:
            genre_to_files[genre_dir.name] = files

# === STEP 2: Shuffle and split ===
file_records = []

for genre, files in genre_to_files.items():
    random.shuffle(files)
    n = len(files)
    n_train = int(n * split_ratios['train'])
    n_val = int(n * split_ratios['val'])

    split_data = [
        ('train', files[:n_train]),
        ('val', files[n_train:n_train + n_val]),
        ('test', files[n_train + n_val:])
    ]

    for split_name, file_list in split_data:
        for file_path in file_list:
            duration = get_duration_ffprobe(file_path)
            num_segments = int(duration // segment_length)
            selected_segments = min(num_segments, max_segments_per_song)
            relative_path = file_path.relative_to(source_dir)

            file_records.append({
                'split': split_name,
                'genre': genre,
                'file_path': str(relative_path),
                'filename': file_path.stem,
                'num_segments': num_segments,
                'num_selected_segments': selected_segments
            })

# === STEP 3: Sort and write CSVs ===
output_base.mkdir(exist_ok=True)
file_records.sort(key=lambda r: (r['genre'], r['split']))

fieldnames = ['split', 'genre', 'file_path', 'filename', 'num_segments', 'num_selected_segments']

# Write full CSV
with open(output_base / 'file_split.csv', 'w', newline='', encoding='utf-8') as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(file_records)

# Write split CSVs
for split in ['train', 'val', 'test']:
    split_records = [r for r in file_records if r['split'] == split]
    with open(output_base / f'file_split_{split}.csv', 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(split_records)

print(f"CSV files written to {output_base}")

If desired, we can copy the audio files. But all we really need are the features, so this step can be skipped.

In [None]:
# copy_files_from_csv.py

import csv
import shutil
from pathlib import Path

# === CONFIGURATION ===
source_dir = Path('Musicmap_Improved_Download')
csv_path = Path('Musicmap_Dataset_Split/file_split.csv')  # Or file_split_train.csv etc.
destination_base = Path('Musicmap_Dataset_Split')         # Will contain train/, val/, test/

# === Read CSV ===
with open(csv_path, newline='', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    records = list(reader)

# === Copy Files ===
for record in records:
    split_dir = destination_base / record['split'] / record['genre']
    split_dir.mkdir(parents=True, exist_ok=True)

    src = source_dir / record['file_path']
    dst = split_dir / src.name

    if not src.exists():
        print(f"Missing: {src}")
        continue

    try:
        shutil.copy2(src, dst)
    except Exception as e:
        print(f"Failed to copy {src} → {dst}: {e}")

print(f"\nFiles copied based on {csv_path}")


Now we'll create the splits for the features. This will:

1. Reads the file_split.csv (or the separate train/val/test CSVs) generated above
2. For each feature directory (e.g., chroma_bs_gray, hpss_mean, etc.):
    1. Iterate over genres and filenames.
    2. Match all *_seg*.png segment files associated with each base filename.
    3. Copy those segment files into the corresponding new directory:
        * musicmap_processed_output_train/<feature>/<genre>/...
        * musicmap_processed_output_val/<feature>/<genre>/...
        * musicmap_processed_output_test/<feature>/<genre>/...
3. For the features_csv directory, it
    1. Loads each CSV file
    2. Filters rows based on the segment names found in file_split.csv
    3. Saves filtered CSVs into new folders

In [8]:
# copy_processed_segments.py

import csv
import shutil
from pathlib import Path
import glob
import pandas as pd

# === CONFIGURATION ===
csv_path = Path("Musicmap_Dataset_Split/file_split_15sec.csv")
processed_input_root = Path("15_second_features/musicmap_processed_output")
output_root_base = Path("15_second_features/musicmap_processed_output_splits")  # Will create _train, _val, _test subdirs

# === Read CSV and group by split ===
with open(csv_path, newline='', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    records = list(reader)

# Map: split → set of full segment names (e.g., "tec05-2-Artist-Title_seg0")
split_to_segments = {'train': set(), 'val': set(), 'test': set()}
for r in records:
    base = r['filename']
    split = r['split']
    num_segs = int(r.get("num_selected_segments", r.get("num_segments", 0)))
    for i in range(num_segs):
        seg_name = f"{base}_seg{i}"
        split_to_segments[split].add(seg_name)

# === Collect feature directories ===
feature_dirs = [d for d in processed_input_root.iterdir() if d.is_dir()]
print(f"Found feature directories: {[d.name for d in feature_dirs]}")

# === Copy PNGs from image-based features ===
for record in records:
    split = record["split"]
    genre = record["genre"]
    base_filename = record["filename"]

    for feature_dir in feature_dirs:
        if feature_dir.name == "features_csv":
            continue  # skip until next step

        source_genre_dir = feature_dir / genre
        if not source_genre_dir.exists():
            print(f"[WARNING] Missing genre directory: {source_genre_dir}")
            continue


        all_pngs = list(source_genre_dir.glob("*.png"))
        matched_segments = [f for f in all_pngs if f.name.startswith(f"{base_filename}_seg")]

        if not matched_segments:
            print(f"[WARNING] No segments found for {base_filename} in {source_genre_dir}")
            continue

        num_segs = int(record.get("num_selected_segments", record.get("num_segments", 0)))
        if len(matched_segments) > num_segs:
            matched_segments = random.sample(matched_segments, num_segs)

# Code was previously not selecting random segments... fix is above
#        all_pngs = list(source_genre_dir.glob("*.png"))
#        matched_segments = [f for f in all_pngs if f.name.startswith(f"{base_filename}_seg")]

#        segment_pattern = str(source_genre_dir / f"{base_filename}_seg*.png")
#        matched_segments = glob.glob(segment_pattern)

        if not matched_segments:
            print(f"[WARNING] No segments found for {base_filename} in {source_genre_dir}")
            continue

        dest_genre_dir = output_root_base.with_name(output_root_base.name + f"_{split}") / feature_dir.name / genre
        dest_genre_dir.mkdir(parents=True, exist_ok=True)

        for seg_file in matched_segments:
            seg_file_path = Path(seg_file)
            dest_path = dest_genre_dir / seg_file_path.name
            try:
                shutil.copy2(seg_file_path, dest_path)
            except Exception as e:
                print(f"[ERROR] Failed to copy {seg_file_path} → {dest_path}: {e}")

# === Handle features_csv filtering ===
csv_feature_dir = processed_input_root / "features_csv"
if csv_feature_dir.exists():
    for csv_file in csv_feature_dir.glob("*.csv"):
        df = pd.read_csv(csv_file)
        if "segment" not in df.columns:
            print(f"[WARNING] Skipping CSV without 'segment' column: {csv_file}")
            continue

        for split, segments in split_to_segments.items():
            filtered_df = df[df["segment"].isin(segments)].copy()
            if not filtered_df.empty:
                out_dir = output_root_base.with_name(output_root_base.name + f"_{split}") / "features_csv"
                out_dir.mkdir(parents=True, exist_ok=True)
                out_path = out_dir / csv_file.name
                filtered_df.to_csv(out_path, index=False)
                print(f"✓ Wrote filtered CSV to {out_path}")

print("Segment and feature CSV copying completed.")


Found feature directories: ['chroma_cq_gray', 'features_csv', 'chroma_gray', 'hpss_mean', 'hpss_median', 'resnet_mel_rgb', 'mfcc_gray', 'chroma_bs_gray', 'mfcc_plot', 'mel_pcen_gray', 'mel_db_gray']
✓ Wrote filtered CSV to 15_second_features/musicmap_processed_output_splits_train/features_csv/tonnetz_3.csv
✓ Wrote filtered CSV to 15_second_features/musicmap_processed_output_splits_val/features_csv/tonnetz_3.csv
✓ Wrote filtered CSV to 15_second_features/musicmap_processed_output_splits_test/features_csv/tonnetz_3.csv
✓ Wrote filtered CSV to 15_second_features/musicmap_processed_output_splits_train/features_csv/tonnetz_2.csv
✓ Wrote filtered CSV to 15_second_features/musicmap_processed_output_splits_val/features_csv/tonnetz_2.csv
✓ Wrote filtered CSV to 15_second_features/musicmap_processed_output_splits_test/features_csv/tonnetz_2.csv
✓ Wrote filtered CSV to 15_second_features/musicmap_processed_output_splits_train/features_csv/tonnetz_0.csv
✓ Wrote filtered CSV to 15_second_features/m

<h3>Enhanced version with dry-run</h3>

The version below doesn't work quite right - it randomly selects a different collection of files for each feature folder, rather than ensuring consistency in files across the features.

In [9]:
import csv
import shutil
import random
import argparse
from pathlib import Path
import pandas as pd
import sys

def main(dry_run=False):
    # === CONFIGURATION ===
    csv_path = Path("Musicmap_Dataset_Split/file_split_15sec.csv")
    processed_input_root = Path("15_second_features_augmented/musicmap_processed_output")
    output_root_base = Path("15_second_features_augmented/musicmap_processed_output_splits")  # Will create _train, _val, _test subdirs

    dry_run_log = []

    # === Read CSV and group by split ===
    with open(csv_path, newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        records = list(reader)

    # Map: split → set of full segment names (e.g., "tec05-2-Artist-Title_seg0")
    split_to_segments = {'train': set(), 'val': set(), 'test': set()}
    for r in records:
        base = r['filename']
        split = r['split']
        num_segs = int(r.get("num_selected_segments", r.get("num_segments", 0)))
        for i in range(num_segs):
            seg_name = f"{base}_seg{i}"
            split_to_segments[split].add(seg_name)

    # === Collect feature directories ===
    feature_dirs = [d for d in processed_input_root.iterdir() if d.is_dir()]
    print(f"Found feature directories: {[d.name for d in feature_dirs]}")

    # === Copy PNGs from image-based features ===
    for record in records:
        split = record["split"]
        genre = record["genre"]
        base_filename = record["filename"]

        for feature_dir in feature_dirs:
            if feature_dir.name == "features_csv":
                continue  # skip until next step

            source_genre_dir = feature_dir / genre
            if not source_genre_dir.exists():
                msg = f"[WARNING] Missing genre directory: {source_genre_dir}"
                print(msg)
                dry_run_log.append(msg)
                continue

            all_pngs = list(source_genre_dir.glob("*.png"))
            matched_segments = [f for f in all_pngs if f.name.startswith(f"{base_filename}_seg")]

            if not matched_segments:
                msg = f"[WARNING] No segments found for {base_filename} in {source_genre_dir}"
                print(msg)
                dry_run_log.append(msg)
                continue

            num_segs = int(record.get("num_selected_segments", record.get("num_segments", 0)))
            if len(matched_segments) > num_segs:
                matched_segments = random.sample(matched_segments, num_segs)

            dest_genre_dir = output_root_base.with_name(output_root_base.name + f"_{split}") / feature_dir.name / genre
            if not dry_run:
                dest_genre_dir.mkdir(parents=True, exist_ok=True)

            for seg_file in matched_segments:
                seg_file_path = Path(seg_file)
                dest_path = dest_genre_dir / seg_file_path.name
                if dry_run:
                    dry_run_log.append(f"[DRY-RUN] Would copy {seg_file_path} → {dest_path}")
                else:
                    try:
                        shutil.copy2(seg_file_path, dest_path)
                    except Exception as e:
                        print(f"[ERROR] Failed to copy {seg_file_path} → {dest_path}: {e}")

    # === Handle features_csv filtering ===
    csv_feature_dir = processed_input_root / "features_csv"
    if csv_feature_dir.exists():
        for csv_file in csv_feature_dir.glob("*.csv"):
            df = pd.read_csv(csv_file)
            if "segment" not in df.columns:
                msg = f"[WARNING] Skipping CSV without 'segment' column: {csv_file}"
                print(msg)
                dry_run_log.append(msg)
                continue

            for split, segments in split_to_segments.items():
                filtered_df = df[df["segment"].isin(segments)].copy()
                if not filtered_df.empty:
                    out_dir = output_root_base.with_name(output_root_base.name + f"_{split}") / "features_csv"
                    out_path = out_dir / csv_file.name
                    if dry_run:
                        dry_run_log.append(f"[DRY-RUN] Would write filtered CSV to {out_path}")
                    else:
                        out_dir.mkdir(parents=True, exist_ok=True)
                        filtered_df.to_csv(out_path, index=False)
                        print(f"✓ Wrote filtered CSV to {out_path}")

    # === Dry-run summary ===
    if dry_run:
        log_path = Path("dry_run_log.txt")
        with log_path.open("w", encoding="utf-8") as f:
            for line in dry_run_log:
                f.write(line + "\n")
        print(f"\n[DRY-RUN] Completed. Log written to {log_path}")
    else:
        print("Segment and feature CSV copying completed.")


# === Entry Point ===
if __name__ == "__main__":
    main(dry_run=False)  # or dry_run=False

# Enable below for command line useage
#if __name__ == "__main__":
#    import sys
#    parser = argparse.ArgumentParser()
#    parser.add_argument('--dry-run', action='store_true', help='Only simulate copying, don’t copy files.')
#
#    # Avoid errors in Jupyter or VS Code notebooks by ignoring unknown args
#    args, unknown = parser.parse_known_args()
#
#    dry_run = args.dry_run  # You can also hardcode here: dry_run = True
#    main(dry_run)


Found feature directories: ['chroma_cq_gray', 'features_csv', 'chroma_gray', 'hpss_mean', 'hpss_median', 'resnet_mel_rgb', 'mfcc_gray', 'chroma_bs_gray', 'mfcc_plot', 'mel_pcen_gray', 'mel_db_gray']


KeyboardInterrupt: 

<h2>Enhanced, Corrected Version</h2>

In [10]:
import os
import csv
import random
import shutil
from pathlib import Path
import pandas as pd
import unicodedata

def normalize_filename(s):
    # Normalize Unicode and strip whitespace
    return unicodedata.normalize("NFKC", s).strip()

def main(dry_run=False):
    # === CONFIGURATION ===
    csv_path = Path("Musicmap_Dataset_Split/file_split_15sec_augmented.csv")
    processed_input_root = Path("15_second_features_augmented/musicmap_processed_output")
    output_root_base = Path("15_second_features_augmented/musicmap_processed_output_splits")  # Will create _train, _val, _test subdirs

    dry_run_log = []

    # === Read CSV and group by split ===
    with open(csv_path, newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        records = list(reader)

    # Build dict: (genre, filename) → split, num_segs
    record_lookup = {}
    for r in records:
        key = (r["genre"], r["filename"])
        record_lookup[key] = {
            "split": r["split"],
            "num_segs": int(r.get("num_selected_segments", r.get("num_segments", 0)))
        }

    # === Collect feature directories ===
    feature_dirs = [d for d in processed_input_root.iterdir() if d.is_dir()]
    print(f"Found feature directories: {[d.name for d in feature_dirs]}")

    # === Determine selected segment files per file ===
    selected_files_per_record = {}  # key: (genre, base_filename) → List[Path]

    for (genre, base_filename), info in record_lookup.items():
        num_segs = info["num_segs"]

        # Use first non-csv feature dir as reference
        reference_dir = next((d for d in feature_dirs if d.name != "features_csv"), None)
        if not reference_dir:
            print("[ERROR] No reference feature directory found.")
            return

        source_dir = reference_dir / genre
        if not source_dir.exists():
            msg = f"[WARNING] Missing genre folder in {reference_dir.name}: {genre}"
            print(msg)
            dry_run_log.append(msg)
            continue

        # Normalize base filename to NFC for reliable matching
        normalized_base = normalize_filename(base_filename)

        # Get all candidate segment files in the genre folder
        all_candidates = list(source_dir.glob("*_seg*.png"))

        # Normalize and match using stem comparison
        matched_segments = [
            f for f in all_candidates
            if normalize_filename(base_filename) in normalize_filename(f.stem)
        ]

        if len(all_candidates) < num_segs:
            msg = f"[WARNING] Only {len(all_candidates)} segments found for {base_filename}, requested {num_segs}"
            print(msg)
            dry_run_log.append(msg)

        selected = random.sample(matched_segments, min(len(matched_segments), num_segs))
        selected_files_per_record[(genre, base_filename)] = selected

    # === Copy selected segments to each feature directory ===
    for (genre, base_filename), selected_files in selected_files_per_record.items():
        split = record_lookup[(genre, base_filename)]["split"]

        for feature_dir in feature_dirs:
            if feature_dir.name == "features_csv":
                continue  # Skip CSVs for now

            source_genre_dir = feature_dir / genre
            dest_genre_dir = output_root_base.with_name(output_root_base.name + f"_{split}") / feature_dir.name / genre

            if not dry_run:
                dest_genre_dir.mkdir(parents=True, exist_ok=True)

            for seg_file in selected_files:
                src_path = source_genre_dir / seg_file.name
                dest_path = dest_genre_dir / seg_file.name
                if not src_path.exists():
                    msg = f"[WARNING] Missing file: {src_path}"
                    print(msg)
                    dry_run_log.append(msg)
                    continue

                if dry_run:
                    dry_run_log.append(f"[DRY-RUN] Would copy {src_path} → {dest_path}")
                else:
                    try:
                        shutil.copy2(src_path, dest_path)
                    except Exception as e:
                        print(f"[ERROR] Failed to copy {src_path} → {dest_path}: {e}")

    # === Handle features_csv filtering ===
    csv_feature_dir = processed_input_root / "features_csv"
    if csv_feature_dir.exists():
        for csv_file in csv_feature_dir.glob("*.csv"):
            df = pd.read_csv(csv_file)
            if "segment" not in df.columns:
                msg = f"[WARNING] Skipping CSV without 'segment' column: {csv_file}"
                print(msg)
                dry_run_log.append(msg)
                continue

            for split in ['train', 'val', 'test']:
                segments = {
                    seg.name.replace(".png", "")
                    for (genre, fname), segs in selected_files_per_record.items()
                    if record_lookup[(genre, fname)]["split"] == split
                    for seg in segs
                }
                filtered_df = df[df["segment"].isin(segments)].copy()
                if not filtered_df.empty:
                    out_dir = output_root_base.with_name(output_root_base.name + f"_{split}") / "features_csv"
                    out_path = out_dir / csv_file.name
                    if dry_run:
                        dry_run_log.append(f"[DRY-RUN] Would write filtered CSV to {out_path}")
                    else:
                        out_dir.mkdir(parents=True, exist_ok=True)
                        filtered_df.to_csv(out_path, index=False)
                        print(f"✓ Wrote filtered CSV to {out_path}")

    # === Dry-run summary ===
    if dry_run:
        log_path = Path("dry_run_log.txt")
        with log_path.open("w", encoding="utf-8") as f:
            for line in dry_run_log:
                f.write(line + "\n")
        print(f"\n[DRY-RUN] Completed. Log written to {log_path}")
    else:
        print("Segment and feature CSV copying completed.")


# === Entry Point ===
if __name__ == "__main__":
    main(dry_run=False)  # Change to False when ready to actually copy files


# Enable below for command line useage
#if __name__ == "__main__":
#    import sys
#    parser = argparse.ArgumentParser()
#    parser.add_argument('--dry-run', action='store_true', help='Only simulate copying, don’t copy files.')
#
#    # Avoid errors in Jupyter or VS Code notebooks by ignoring unknown args
#    args, unknown = parser.parse_known_args()
#
#    dry_run = args.dry_run  # You can also hardcode here: dry_run = True
#    main(dry_run)


Found feature directories: ['chroma_cq_gray', 'features_csv', 'chroma_gray', 'hpss_mean', 'hpss_median', 'resnet_mel_rgb', 'mfcc_gray', 'chroma_bs_gray', 'mfcc_plot', 'mel_pcen_gray', 'mel_db_gray']
✓ Wrote filtered CSV to 15_second_features_augmented/musicmap_processed_output_splits_train/features_csv/tonnetz_3.csv
✓ Wrote filtered CSV to 15_second_features_augmented/musicmap_processed_output_splits_val/features_csv/tonnetz_3.csv
✓ Wrote filtered CSV to 15_second_features_augmented/musicmap_processed_output_splits_test/features_csv/tonnetz_3.csv
✓ Wrote filtered CSV to 15_second_features_augmented/musicmap_processed_output_splits_train/features_csv/tonnetz_2.csv
✓ Wrote filtered CSV to 15_second_features_augmented/musicmap_processed_output_splits_val/features_csv/tonnetz_2.csv
✓ Wrote filtered CSV to 15_second_features_augmented/musicmap_processed_output_splits_test/features_csv/tonnetz_2.csv
✓ Wrote filtered CSV to 15_second_features_augmented/musicmap_processed_output_splits_train/f