In [None]:
!pip install essentia

Collecting essentia
  Downloading essentia-2.1b6.dev1389-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.0 kB)
Downloading essentia-2.1b6.dev1389-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m74.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: essentia
Successfully installed essentia-2.1b6.dev1389


In [None]:
import json
import essentia.standard as es
import os
import essentia

In [None]:
!pwd

/content


In [None]:
!ls

music  sample_data


In [None]:
def extract_transition_features(path):
    def pool_get(pool, name, default=None):
        if pool is None:
            return default
        try:
            return pool[name]
        except (KeyError, TypeError):
            return default

    def to_list(x):
        if x is None:
            return None
        # cannot understand these essentia vectors
        try:
            return [float(v) for v in x]
        except TypeError:
            return None

    aggr_pool = None
    frames_pool = None
    try:
        me = es.MusicExtractor(
            lowlevelStats=['mean', 'stdev'],
            rhythmStats=['mean'],
            tonalStats=['mean']
        )
        aggr_pool, frames_pool = me(path)
    except Exception:
        aggr_pool = None
        frames_pool = None

    bpm = pool_get(aggr_pool, 'rhythm.bpm')
    beat_times = pool_get(aggr_pool, 'rhythm.beats_position')
    if beat_times is None:
        beat_times = pool_get(frames_pool, 'rhythm.beats_position')
    beat_times = to_list(beat_times)

    key = pool_get(aggr_pool, 'tonal.key_key')
    scale = pool_get(aggr_pool, 'tonal.key_scale')
    key_strength = pool_get(aggr_pool, 'tonal.key_strength')

    audio = es.MonoLoader(filename=path)()

    # should implement fall backs here, not sure why this wont work
    if key is None or scale is None or key_strength is None:
        try:
            k, s, ks = es.KeyExtractor()(audio)
            key, scale, key_strength = k, s, float(ks)
        except Exception:
            pass

    if bpm is None or beat_times is None or len(beat_times) == 0:
        try:
            r_out = es.RhythmExtractor2013(method='multifeature')(audio)
            bpm = float(r_out[0])
            beat_times = list(map(float, r_out[1])) if len(r_out) > 1 else []
        except Exception:
            bpm = None
            beat_times = []

    last_4_phrases = []
    if beat_times:
        phrase_boundaries = beat_times[::32]  # every 32nd beat as the start of a phrase
        if phrase_boundaries:
            formatted = [f"{int(s)//60:02d}:{int(s)%60:02d}" for s in phrase_boundaries]
            last_4_phrases = formatted[-4:]
            first_4_phrases = formatted[:4]

    song_name = path.split('/')[-1]
    return { 'song_name' : song_name, 'features': {
        "bpm": float(bpm) if bpm is not None else None,
        "key": key,
        "scale": scale,
        "key_strength": float(key_strength) if key_strength is not None else None,
        "last_phrase_boundaries": last_4_phrases,
        'first_phrase_boundaries': first_4_phrases}
    }

In [None]:
results = extract_transition_features("music/stargazing-kygo.wav")

In [None]:
results

{'song_name': 'stargazing-kygo.wav',
 'features': {'bpm': 98.84703826904297,
  'key': 'F',
  'scale': 'major',
  'key_strength': 0.9270086884498596,
  'last_phrase_boundaries': ['02:49', '03:08', '03:27', '03:47'],
  'first_phrase_boundaries': ['00:00', '00:17', '00:34', '00:52']}}

In [None]:
# now we can write to a json file
path = "music"
results = []

for filename in os.listdir(path):
  if filename.endswith(".wav"):
    file_path = os.path.join(path, filename)
    output = extract_transition_features(file_path)
    results.append(output)

with open('results.json', 'w') as f:
  json.dump(results, f, indent=4)


In [None]:
# GPT-5 attempt to improve
from typing import Dict, List, Any, Tuple
import math

import numpy as np
import essentia
import essentia.standard as es


def _pool_get(pool, name, default=None):
    if pool is None:
        return default
    try:
        return pool[name]
    except Exception:
        return default


def _load_audio(path: str, sample_rate: int = 44100):
    """Load mono audio with a fixed sample rate (for consistent analysis)."""
    return es.MonoLoader(filename=path, sampleRate=sample_rate)(), sample_rate


def _beats_and_bpm(audio) -> Tuple[float, List[float]]:
    """
    Robust beat+tempo estimator.
    Returns (bpm, beat_times).
    """
    # RhythmExtractor2013 is strong for EDM
    out = es.RhythmExtractor2013(method="multifeature")(audio)
    # Typical order: bpm, beats, confidence, estimates, intervals (varies by build)
    bpm = out[0]
    beats = out[1] if len(out) > 1 else []
    return float(bpm), list(map(float, beats))


def _downbeats(audio) -> List[float]:
    """
    Try to get true downbeats. If it fails (builds can differ), return [] and caller will fall back.
    """
    try:
        db = es.DownbeatTracker(method="dbn")
        db_out = db(audio)
        # Different builds return different shapes; handle defensively.
        # Common: (times, beat_indices) where beat_indices in {1..4} for 4/4
        # We'll interpret "beat index == 1" as downbeat (bar start).
        if isinstance(db_out, tuple) and len(db_out) >= 2:
            times, beat_positions = db_out[0], db_out[1]
            times = list(map(float, times))
            beat_positions = list(map(int, beat_positions))
            return [t for t, b in zip(times, beat_positions) if b == 1]
        # Some builds may already give just downbeat times:
        if isinstance(db_out, (list, np.ndarray)) and len(db_out) and np.ndim(db_out) == 1:
            return list(map(float, db_out))
    except Exception:
        pass
    return []


def _bars_from_beats(beat_times: List[float], assumed_beats_per_bar: int = 4) -> List[float]:
    """
    Fallback bar starts by assuming 4/4 and using every 4th beat as a bar boundary.
    """
    if not beat_times:
        return []
    return [beat_times[i] for i in range(0, len(beat_times), assumed_beats_per_bar)]


def _phrases_from_bars(bar_times: List[float], bars_per_phrase: int = 8) -> List[float]:
    """
    Phrase boundary candidates from bar starts (8 bars default).
    Returns absolute timestamps for each phrase start.
    """
    if not bar_times:
        return []
    return [bar_times[i] for i in range(0, len(bar_times), bars_per_phrase)]


def _rms_profile(audio: np.ndarray, sr: int, frame_size: int = 2048, hop_size: int = 512):
    """
    Compute RMS (energy) envelope over time.
    Returns (times, rms_values).
    """
    window = es.Windowing(type="hann")
    rms_alg = es.RMS()

    rms_vals = []
    # FrameGenerator yields frames of length frame_size with hop hop_size
    for frame in es.FrameGenerator(audio, frameSize=frame_size, hopSize=hop_size, startFromZero=True):
        rms_vals.append(float(rms_alg(window(frame))))

    rms_vals = np.asarray(rms_vals, dtype=np.float32)
    times = np.arange(len(rms_vals), dtype=np.float32) * (hop_size / float(sr))
    return times, rms_vals


def _score_boundaries_by_energy(boundary_times: List[float],
                                env_times: np.ndarray,
                                env_vals: np.ndarray,
                                pre_window_s: float = 1.0,
                                post_window_s: float = 1.0) -> List[float]:
    """
    Score each boundary by mean RMS(Post) - mean RMS(Pre).
    We want higher energy after a boundary (drop/entry), typical for strong phrase starts in EDM.
    """
    if len(boundary_times) == 0 or len(env_times) == 0:
        return [0.0] * len(boundary_times)

    hop_dt = float(env_times[1] - env_times[0]) if len(env_times) > 1 else 0.01

    scores = []
    for t in boundary_times:
        idx = int(np.searchsorted(env_times, t))
        n_pre = max(1, int(pre_window_s / hop_dt))
        n_post = max(1, int(post_window_s / hop_dt))

        pre_start = max(0, idx - n_pre)
        pre_end = max(pre_start + 1, idx)
        post_start = idx
        post_end = min(len(env_vals), idx + n_post)

        if pre_end <= pre_start or post_end <= post_start:
            scores.append(0.0)
            continue

        pre_mean = float(np.mean(env_vals[pre_start:pre_end]))
        post_mean = float(np.mean(env_vals[post_start:post_end]))
        scores.append(max(0.0, post_mean - pre_mean))  # clamp to [0, inf)
    return scores


def _normalize_scores(scores: List[float]) -> List[float]:
    if not scores:
        return scores
    mx = max(scores)
    if mx <= 1e-12:
        return [0.0] * len(scores)
    return [float(s / mx) for s in scores]


def extract_transition_features_advanced(path: str,
                                         bars_per_phrase: int = 8,
                                         return_n_last: int = 4,
                                         hybrid_mode: bool = True) -> Dict[str, Any]:
    """
    Hybrid phrase detection:
      - Try true downbeats → bars → 8-bar phrases
      - Score phrase boundaries by energy delta (RMS)
      - If downbeats fail or signal is weak, fall back to a strict 8-bar grid from beats

    Returns:
      {
        "bpm": float|None,
        "key": str|None,
        "scale": str|None,
        "key_strength": float|None,
        "last_phrase_boundaries": List[float],   # last N phrase starts, ascending
        "phrase_candidates": List[{"time": float, "score": float, "source": "downbeat+energy"|"grid-fallback"}]
      }
    """
    # ---------- High-level try (MusicExtractor) ----------
    features_pool = None
    try:
        me = es.MusicExtractor(
            lowlevelStats=['mean', 'stdev'],
            rhythmStats=['mean'],
            tonalStats=['mean']
        )
        features_pool = me(path)  # single Pool return
    except Exception:
        features_pool = None

    bpm = _pool_get(features_pool, 'rhythm.bpm')
    beat_times = _pool_get(features_pool, 'rhythm.beats_position')
    key = _pool_get(features_pool, 'tonal.key_key')
    scale = _pool_get(features_pool, 'tonal.key_scale')
    key_strength = _pool_get(features_pool, 'tonal.key_strength')

    # ---------- Audio & fallbacks ----------
    audio, sr = _load_audio(path, sample_rate=44100)

    if key is None or scale is None or key_strength is None:
        try:
            k, s, ks = es.KeyExtractor()(audio)
            key, scale, key_strength = k, s, float(ks)
        except Exception:
            key = key or None
            scale = scale or None
            key_strength = key_strength or None

    if bpm is None or beat_times is None or len(beat_times) == 0:
        try:
            bpm, beat_times = _beats_and_bpm(audio)
        except Exception:
            bpm, beat_times = None, []

    # ---------- Downbeat → bars → phrase candidates ----------
    phrase_candidates: List[float] = []
    source = "downbeat+energy"

    downbeat_times = _downbeats(audio) if hybrid_mode else []
    if downbeat_times:
        bars = downbeat_times[:]  # bar starts
    else:
        # fallback bars from beats (assume 4/4)
        bars = _bars_from_beats(beat_times, assumed_beats_per_bar=4)
        source = "grid-fallback"

    phrase_candidates = _phrases_from_bars(bars, bars_per_phrase=bars_per_phrase)

    # ---------- Score candidates by energy delta ----------
    env_t, env_v = _rms_profile(audio, sr, frame_size=2048, hop_size=512)
    raw_scores = _score_boundaries_by_energy(phrase_candidates, env_t, env_v,
                                             pre_window_s=1.0, post_window_s=1.0)
    scores = _normalize_scores(raw_scores)

    # If scores are all ~0 and we were in 'downbeat+energy', switch to strict grid fallback
    if hybrid_mode and source == "downbeat+energy" and (not scores or max(scores) < 0.15):
        bars_fb = _bars_from_beats(beat_times, assumed_beats_per_bar=4)
        phrase_candidates = _phrases_from_bars(bars_fb, bars_per_phrase=bars_per_phrase)
        raw_scores = _score_boundaries_by_energy(phrase_candidates, env_t, env_v,
                                                 pre_window_s=1.0, post_window_s=1.0)
        scores = _normalize_scores(raw_scores)
        source = "grid-fallback"

    # ---------- Assemble outputs ----------
    # Ascending times already; take the last N phrase starts to mimic your previous API
    last_phrase_boundaries = phrase_candidates[-return_n_last:] if phrase_candidates else []
    formatted = [f"{int(s)//60:02d}:{int(s)%60:02d}" for s in last_phrase_boundaries]

    return {
        "bpm": float(bpm) if bpm is not None else None,
        "key": key,
        "scale": scale,
        "key_strength": float(key_strength) if key_strength is not None else None,
        "last_phrase_boundaries": formatted,
        "phrase_candidates": [
            {"time": float(t), "score": float(s), "source": source}
            for t, s in zip(phrase_candidates, scores)
        ],
    }


In [None]:
results = extract_transition_features_advanced("music/stargazing-kygo.wav")

In [None]:
results

{'bpm': 98.60183715820312,
 'key': 'F',
 'scale': 'major',
 'key_strength': 0.9270086884498596,
 'last_phrase_boundaries': ['02:56', '03:15', '03:35', '03:54'],
 'phrase_candidates': [{'time': 0.6037188172340393,
   'score': 0.025312170840652095,
   'source': 'grid-fallback'},
  {'time': 20.09687042236328, 'score': 0.0, 'source': 'grid-fallback'},
  {'time': 39.47392272949219, 'score': 0.0, 'source': 'grid-fallback'},
  {'time': 58.86258316040039, 'score': 0.0, 'source': 'grid-fallback'},
  {'time': 78.48344421386719, 'score': 1.0, 'source': 'grid-fallback'},
  {'time': 98.266845703125, 'score': 0.0, 'source': 'grid-fallback'},
  {'time': 117.63228607177734,
   'score': 0.7300197976596308,
   'source': 'grid-fallback'},
  {'time': 137.04417419433594,
   'score': 0.7841578129717378,
   'source': 'grid-fallback'},
  {'time': 156.43283081054688,
   'score': 0.5706976032231029,
   'source': 'grid-fallback'},
  {'time': 176.08851623535156, 'score': 0.0, 'source': 'grid-fallback'},
  {'time'

In [None]:
# 'last_phrase_boundaries': ['02:49', '03:08', '03:27', '03:47'],